Skip to main content

open_agent/
client.rs

1//! Client for streaming queries and multi-turn conversations
2//!
3//! This module provides the core streaming client implementation for the Open Agent SDK.
4//! It handles communication with OpenAI-compatible APIs, manages conversation history,
5//! and provides two modes of operation: manual and automatic tool execution.
6//!
7//! # Architecture Overview
8//!
9//! The SDK implements a **streaming-first architecture** where all responses from the model
10//! are received as a stream of content blocks. This design enables:
11//!
12//! - **Progressive rendering**: Display text as it arrives without waiting for completion
13//! - **Real-time tool execution**: Execute tools as they're requested by the model
14//! - **Interruption support**: Cancel operations mid-stream without corrupting state
15//! - **Memory efficiency**: Process large responses without buffering everything in memory
16//!
17//! ## Two Operating Modes
18//!
19//! ### 1. Manual Tool Execution Mode (default)
20//!
21//! In manual mode, the client streams content blocks directly to the caller. When the model
22//! requests a tool, the caller receives a `ToolUseBlock`, executes the tool, adds the result
23//! using `add_tool_result()`, and continues the conversation with another `send()` call.
24//!
25//! **Use cases**: Custom tool execution logic, interactive debugging, fine-grained control
26//!
27//! ### 2. Automatic Tool Execution Mode
28//!
29//! When `auto_execute_tools` is enabled, the client automatically executes tools and continues
30//! the conversation until receiving a text-only response. The caller only receives the final
31//! text blocks after all tool iterations complete.
32//!
33//! **Use cases**: Simple agentic workflows, automated task completion, batch processing
34//!
35//! ## Request Flow
36//!
37//! ```text
38//! User sends prompt
39//!     │
40//!     ├─> UserPromptSubmit hook executes (can modify/block prompt)
41//!     │
42//!     ├─> Prompt added to history
43//!     │
44//!     ├─> HTTP request to OpenAI-compatible API
45//!     │
46//!     ├─> Response streamed as Server-Sent Events (SSE)
47//!     │
48//!     ├─> SSE chunks aggregated into ContentBlocks
49//!     │
50//!     └─> Blocks emitted to caller (or buffered for auto-execution)
51//! ```
52//!
53//! ## Tool Execution Flow
54//!
55//! ### Manual Mode:
56//! ```text
57//! receive() → ToolUseBlock
58//!     │
59//!     ├─> Caller executes tool
60//!     │
61//!     ├─> Caller calls add_tool_result()
62//!     │
63//!     ├─> Caller calls send("") to continue
64//!     │
65//!     └─> receive() → TextBlock (model's response)
66//! ```
67//!
68//! ### Auto Mode:
69//! ```text
70//! receive() triggers auto-execution loop
71//!     │
72//!     ├─> Collect all blocks from stream
73//!     │
74//!     ├─> For each ToolUseBlock:
75//!     │   ├─> PreToolUse hook executes (can modify/block)
76//!     │   ├─> Tool executed via Tool.execute()
77//!     │   ├─> PostToolUse hook executes (can modify result)
78//!     │   └─> Result added to history
79//!     │
80//!     ├─> Continue conversation with send("")
81//!     │
82//!     ├─> Repeat until text-only response or max iterations
83//!     │
84//!     └─> Return text blocks one-by-one via receive()
85//! ```
86//!
87//! ## State Management
88//!
89//! The client maintains several pieces of state:
90//!
91//! - **history**: Full conversation history (`Vec<Message>`)
92//! - **current_stream**: Active SSE stream being consumed (`Option<ContentStream>`)
93//! - **interrupted**: Atomic flag for cancellation (`Arc<AtomicBool>`)
94//! - **auto_exec_buffer**: Buffered blocks for auto-execution mode (`Vec<ContentBlock>`)
95//! - **auto_exec_index**: Current position in buffer (usize)
96//!
97//! ## Interruption Mechanism
98//!
99//! The interrupt system uses `Arc<AtomicBool>` to enable safe, thread-safe cancellation:
100//!
101//! ```rust,no_run
102//! # use open_agent::{Client, AgentOptions};
103//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
104//! let mut client = Client::new(AgentOptions::default())?;
105//! let handle = client.interrupt_handle(); // Clone Arc for use in other threads
106//!
107//! // In another thread or async task:
108//! tokio::spawn(async move {
109//!     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
110//!     handle.store(true, std::sync::atomic::Ordering::SeqCst);
111//! });
112//!
113//! client.send("Long request").await?;
114//! while let Some(block) = client.receive().await? {
115//!     // Will stop when interrupted
116//! }
117//! # Ok(())
118//! # }
119//! ```
120//!
121//! ## Hook Integration
122//!
123//! Hooks provide extension points throughout the request lifecycle:
124//!
125//! - **UserPromptSubmit**: Called before sending user prompt (can modify or block)
126//! - **PreToolUse**: Called before executing each tool (can modify input or block execution)
127//! - **PostToolUse**: Called after tool execution (can modify result)
128//!
129//! Hooks are only invoked in specific scenarios and have access to conversation history.
130//!
131//! ## Error Handling
132//!
133//! Errors are propagated immediately and leave the client in a valid state:
134//!
135//! - **HTTP errors**: Network failures, timeouts, connection issues
136//! - **API errors**: Invalid model, authentication failures, rate limits
137//! - **Parse errors**: Malformed SSE responses, invalid JSON
138//! - **Tool errors**: Tool execution failures (converted to JSON error responses)
139//! - **Hook errors**: Hook execution failures or blocked operations
140//!
141//! After an error, the client remains usable for new requests.
142//!
143//! # Examples
144//!
145//! ## Simple Single-Turn Query
146//!
147//! ```rust,no_run
148//! use open_agent::{query, AgentOptions};
149//! use futures::StreamExt;
150//!
151//! #[tokio::main]
152//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
153//!     let options = AgentOptions::builder()
154//!         .model("gpt-4")
155//!         .api_key("sk-...")
156//!         .build()?;
157//!
158//!     let mut stream = query("What is Rust?", &options).await?;
159//!
160//!     while let Some(block) = stream.next().await {
161//!         if let open_agent::ContentBlock::Text(text) = block? {
162//!             print!("{}", text.text);
163//!         }
164//!     }
165//!
166//!     Ok(())
167//! }
168//! ```
169//!
170//! ## Multi-Turn Conversation
171//!
172//! ```rust,no_run
173//! use open_agent::{Client, AgentOptions, ContentBlock};
174//! use futures::StreamExt;
175//!
176//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
177//! let mut client = Client::new(AgentOptions::builder()
178//!     .model("gpt-4")
179//!     .api_key("sk-...")
180//!     .build()?)?;
181//!
182//! // First question
183//! client.send("What's the capital of France?").await?;
184//! while let Some(block) = client.receive().await? {
185//!     if let ContentBlock::Text(text) = block {
186//!         println!("{}", text.text);
187//!     }
188//! }
189//!
190//! // Follow-up question (history is maintained)
191//! client.send("What's its population?").await?;
192//! while let Some(block) = client.receive().await? {
193//!     if let ContentBlock::Text(text) = block {
194//!         println!("{}", text.text);
195//!     }
196//! }
197//! # Ok(())
198//! # }
199//! ```
200//!
201//! ## Manual Tool Execution
202//!
203//! ```rust,no_run
204//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
205//! use serde_json::json;
206//!
207//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208//! let calculator = Tool::new(
209//!     "calculator",
210//!     "Performs arithmetic operations",
211//!     json!({"type": "object", "properties": {"operation": {"type": "string"}}}),
212//!     |input| Box::pin(async move {
213//!         // Custom execution logic
214//!         Ok(json!({"result": 42}))
215//!     })
216//! );
217//!
218//! let mut client = Client::new(AgentOptions::builder()
219//!     .model("gpt-4")
220//!     .api_key("sk-...")
221//!     .tools(vec![calculator])
222//!     .build()?)?;
223//!
224//! client.send("Calculate 2+2").await?;
225//!
226//! while let Some(block) = client.receive().await? {
227//!     match block {
228//!         ContentBlock::ToolUse(tool_use) => {
229//!             println!("Model wants to use: {}", tool_use.name());
230//!
231//!             // Execute tool manually
232//!             let tool = client.get_tool(tool_use.name()).unwrap();
233//!             let result = tool.execute(tool_use.input().clone()).await?;
234//!
235//!             // Add result and continue
236//!             client.add_tool_result(tool_use.id(), result)?;
237//!             client.send("").await?;
238//!         }
239//!         ContentBlock::Text(text) => {
240//!             println!("Response: {}", text.text);
241//!         }
242//!         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
243//!     }
244//! }
245//! # Ok(())
246//! # }
247//! ```
248//!
249//! ## Automatic Tool Execution
250//!
251//! ```rust,no_run
252//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
253//! use serde_json::json;
254//!
255//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256//! let calculator = Tool::new(
257//!     "calculator",
258//!     "Performs arithmetic operations",
259//!     json!({"type": "object"}),
260//!     |input| Box::pin(async move { Ok(json!({"result": 42})) })
261//! );
262//!
263//! let mut client = Client::new(AgentOptions::builder()
264//!     .model("gpt-4")
265//!     .api_key("sk-...")
266//!     .tools(vec![calculator])
267//!     .auto_execute_tools(true)  // Enable auto-execution
268//!     .max_tool_iterations(5)    // Max 5 tool rounds
269//!     .build()?)?;
270//!
271//! client.send("Calculate 2+2 and then multiply by 3").await?;
272//!
273//! // Tools are executed automatically - you only get final text response
274//! while let Some(block) = client.receive().await? {
275//!     if let ContentBlock::Text(text) = block {
276//!         println!("{}", text.text);
277//!     }
278//! }
279//! # Ok(())
280//! # }
281//! ```
282//!
283//! ## With Hooks
284//!
285//! ```ignore
286//! use open_agent::{Client, AgentOptions, Hooks, HookDecision};
287//!
288//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
289//! let hooks = Hooks::new()
290//!     .add_user_prompt_submit(|event| async move {
291//!         // Block prompts containing certain words
292//!         if event.prompt.contains("forbidden") {
293//!             return Some(HookDecision::block("Forbidden word detected"));
294//!         }
295//!         Some(HookDecision::continue_())
296//!     })
297//!     .add_pre_tool_use(|event| async move {
298//!         // Log all tool uses
299//!         println!("Executing tool: {}", event.tool_name);
300//!         Some(HookDecision::continue_())
301//!     });
302//!
303//! let mut client = Client::new(AgentOptions::builder()
304//!     .model("gpt-4")
305//!     .base_url("http://localhost:1234/v1")
306//!     .hooks(hooks)
307//!     .build()?)?;
308//!
309//! // Hooks will be executed automatically
310//! client.send("Hello!").await?;
311//! # Ok(())
312//! # }
313//! ```
314
315use crate::types::{
316    AgentOptions, ContentBlock, Message, MessageRole, OpenAIContent, OpenAIContentPart,
317    OpenAIFunction, OpenAIMessage, OpenAIRequest, OpenAIToolCall, TextBlock,
318};
319use crate::utils::{ToolCallAggregator, parse_sse_stream};
320use crate::{Error, Result};
321use futures::stream::{Stream, StreamExt};
322use std::pin::Pin;
323use std::sync::Arc;
324use std::sync::atomic::{AtomicBool, Ordering};
325use std::time::Duration;
326
327/// A pinned, boxed stream of content blocks from the model.
328///
329/// This type alias represents an asynchronous stream that yields `ContentBlock` items.
330/// Each item is wrapped in a `Result` to handle potential errors during streaming.
331///
332/// The stream is:
333/// - **Pinned** (`Pin<Box<...>>`): Required for safe async operations and self-referential types
334/// - **Boxed**: Allows dynamic dispatch and hides the concrete stream implementation
335/// - **Send**: Can be safely transferred between threads
336///
337/// # Content Blocks
338///
339/// The stream can yield several types of content blocks:
340///
341/// - **TextBlock**: Incremental text responses from the model
342/// - **ToolUseBlock**: Requests to execute a tool with specific parameters
343/// - **ToolResultBlock**: Results from tool execution (in manual mode)
344///
345/// # Error Handling
346///
347/// Errors in the stream indicate issues like:
348/// - Network failures or timeouts
349/// - Malformed SSE events
350/// - JSON parsing errors
351/// - API errors from the model provider
352///
353/// When an error occurs, the stream typically terminates. It's the caller's responsibility
354/// to handle errors appropriately.
355///
356/// # Examples
357///
358/// ```rust,no_run
359/// use open_agent::{query, AgentOptions, ContentBlock};
360/// use futures::StreamExt;
361///
362/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
363/// let options = AgentOptions::builder()
364///     .model("gpt-4")
365///     .api_key("sk-...")
366///     .build()?;
367///
368/// let mut stream = query("Hello!", &options).await?;
369///
370/// while let Some(result) = stream.next().await {
371///     match result {
372///         Ok(ContentBlock::Text(text)) => print!("{}", text.text),
373///         Ok(_) => {}, // Other block types
374///         Err(e) => eprintln!("Stream error: {}", e),
375///     }
376/// }
377/// # Ok(())
378/// # }
379/// ```
380pub type ContentStream = Pin<Box<dyn Stream<Item = Result<ContentBlock>> + Send>>;
381
382/// Simple query function for single-turn interactions without conversation history.
383///
384/// This is a stateless convenience function for simple queries that don't require
385/// multi-turn conversations. It creates a temporary HTTP client, sends a single
386/// prompt, and returns a stream of content blocks.
387///
388/// For multi-turn conversations or more control over the interaction, use [`Client`] instead.
389///
390/// # Parameters
391///
392/// - `prompt`: The user's message to send to the model
393/// - `options`: Configuration including model, API key, tools, etc.
394///
395/// # Returns
396///
397/// Returns a `ContentStream` that yields content blocks as they arrive from the model.
398/// The stream must be polled to completion to receive all blocks.
399///
400/// # Behavior
401///
402/// 1. Creates a temporary HTTP client with configured timeout
403/// 2. Builds message array (system prompt + user prompt)
404/// 3. Converts tools to OpenAI format if provided
405/// 4. Makes HTTP POST request to `/chat/completions`
406/// 5. Parses Server-Sent Events (SSE) response stream
407/// 6. Aggregates chunks into complete content blocks
408/// 7. Returns stream that yields blocks as they complete
409///
410/// # Error Handling
411///
412/// This function can return errors for:
413/// - HTTP client creation failures
414/// - Network errors during the request
415/// - API errors (authentication, invalid model, rate limits, etc.)
416/// - SSE parsing errors
417/// - JSON deserialization errors
418///
419/// # Performance Notes
420///
421/// - Creates a new HTTP client for each call (consider using `Client` for repeated queries)
422/// - Timeout is configurable via `AgentOptions::timeout` (default: 120 seconds)
423/// - Streaming begins immediately; no buffering of the full response
424///
425/// # Examples
426///
427/// ## Basic Usage
428///
429/// ```rust,no_run
430/// use open_agent::{query, AgentOptions};
431/// use futures::StreamExt;
432///
433/// #[tokio::main]
434/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
435///     let options = AgentOptions::builder()
436///         .system_prompt("You are a helpful assistant")
437///         .model("gpt-4")
438///         .api_key("sk-...")
439///         .build()?;
440///
441///     let mut stream = query("What's the capital of France?", &options).await?;
442///
443///     while let Some(block) = stream.next().await {
444///         match block? {
445///             open_agent::ContentBlock::Text(text) => {
446///                 print!("{}", text.text);
447///             }
448///             open_agent::ContentBlock::ToolUse(_)
449///             | open_agent::ContentBlock::ToolResult(_)
450///             | open_agent::ContentBlock::Image(_) => {}
451///         }
452///     }
453///
454///     Ok(())
455/// }
456/// ```
457///
458/// ## With Tools
459///
460/// ```rust,no_run
461/// use open_agent::{query, AgentOptions, Tool, ContentBlock};
462/// use futures::StreamExt;
463/// use serde_json::json;
464///
465/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
466/// let calculator = Tool::new(
467///     "calculator",
468///     "Performs calculations",
469///     json!({"type": "object"}),
470///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
471/// );
472///
473/// let options = AgentOptions::builder()
474///     .model("gpt-4")
475///     .api_key("sk-...")
476///     .tools(vec![calculator])
477///     .build()?;
478///
479/// let mut stream = query("Calculate 2+2", &options).await?;
480///
481/// while let Some(block) = stream.next().await {
482///     match block? {
483///         ContentBlock::ToolUse(tool_use) => {
484///             println!("Model wants to use: {}", tool_use.name());
485///             // Note: You'll need to manually execute tools and continue
486///             // the conversation. For automatic execution, use Client.
487///         }
488///         ContentBlock::Text(text) => print!("{}", text.text),
489///         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
490///     }
491/// }
492/// # Ok(())
493/// # }
494/// ```
495///
496/// ## Error Handling
497///
498/// ```rust,no_run
499/// use open_agent::{query, AgentOptions};
500/// use futures::StreamExt;
501///
502/// # async fn example() {
503/// let options = AgentOptions::builder()
504///     .model("gpt-4")
505///     .api_key("invalid-key")
506///     .build()
507///     .unwrap();
508///
509/// match query("Hello", &options).await {
510///     Ok(mut stream) => {
511///         while let Some(result) = stream.next().await {
512///             match result {
513///                 Ok(block) => println!("Block: {:?}", block),
514///                 Err(e) => {
515///                     eprintln!("Stream error: {}", e);
516///                     break;
517///                 }
518///             }
519///         }
520///     }
521///     Err(e) => eprintln!("Query failed: {}", e),
522/// }
523/// # }
524/// ```
525pub async fn query(prompt: &str, options: &AgentOptions) -> Result<ContentStream> {
526    // Create HTTP client with configured timeout
527    // The timeout applies to the entire request, not individual chunks
528    let client = reqwest::Client::builder()
529        .timeout(Duration::from_secs(options.timeout()))
530        .build()
531        .map_err(Error::Http)?;
532
533    // Build messages array for the API request
534    // OpenAI format expects an array of message objects with role and content
535    let mut messages = Vec::new();
536
537    // Add system prompt if provided
538    // System prompts set the assistant's behavior and context
539    if !options.system_prompt().is_empty() {
540        messages.push(OpenAIMessage {
541            role: "system".to_string(),
542            content: Some(OpenAIContent::Text(options.system_prompt().to_string())),
543            tool_calls: None,
544            tool_call_id: None,
545        });
546    }
547
548    // Add user prompt
549    // This is the actual query from the user
550    messages.push(OpenAIMessage {
551        role: "user".to_string(),
552        content: Some(OpenAIContent::Text(prompt.to_string())),
553        tool_calls: None,
554        tool_call_id: None,
555    });
556
557    // Convert tools to OpenAI format if any are provided
558    // Tools are described using JSON Schema for parameter validation
559    let tools = if !options.tools().is_empty() {
560        Some(
561            options
562                .tools()
563                .iter()
564                .map(|t| t.to_openai_format())
565                .collect(),
566        )
567    } else {
568        None
569    };
570
571    // Build the OpenAI-compatible request payload
572    // stream=true enables Server-Sent Events for incremental responses
573    let request = OpenAIRequest {
574        model: options.model().to_string(),
575        messages,
576        stream: true, // Critical: enables SSE streaming
577        max_tokens: options.max_tokens(),
578        temperature: Some(options.temperature()),
579        tools,
580    };
581
582    // Make HTTP POST request to the chat completions endpoint
583    let url = format!("{}/chat/completions", options.base_url());
584    let response = client
585        .post(&url)
586        .header("Authorization", format!("Bearer {}", options.api_key()))
587        .header("Content-Type", "application/json")
588        .json(&request)
589        .send()
590        .await
591        .map_err(Error::Http)?;
592
593    // Check for HTTP-level errors before processing the stream
594    // This catches authentication failures, rate limits, invalid models, etc.
595    if !response.status().is_success() {
596        let status = response.status();
597        let body = response.text().await.unwrap_or_else(|e| {
598            eprintln!("WARNING: Failed to read error response body: {}", e);
599            "Unknown error (failed to read response body)".to_string()
600        });
601        return Err(Error::api(format!("API error {}: {}", status, body)));
602    }
603
604    // Parse the Server-Sent Events (SSE) stream
605    // The response body is a stream of "data: {...}" events
606    let sse_stream = parse_sse_stream(response);
607
608    // Aggregate SSE chunks into complete content blocks
609    // ToolCallAggregator handles partial JSON and assembles complete tool calls
610    // The scan() combinator maintains state across stream items
611    let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
612        let result = match chunk_result {
613            Ok(chunk) => match aggregator.process_chunk(chunk) {
614                Ok(blocks) => {
615                    if blocks.is_empty() {
616                        Some(None) // Intermediate chunk, continue streaming
617                    } else {
618                        Some(Some(Ok(blocks))) // Complete block(s) ready
619                    }
620                }
621                Err(e) => Some(Some(Err(e))), // Propagate processing error
622            },
623            Err(e) => Some(Some(Err(e))), // Propagate stream error
624        };
625        futures::future::ready(result)
626    });
627
628    // Flatten the stream to emit individual blocks
629    // filter_map removes None values (incomplete chunks)
630    // flat_map expands Vec<ContentBlock> into individual items
631    let flattened = stream
632        .filter_map(|item| async move { item })
633        .flat_map(|result| {
634            futures::stream::iter(match result {
635                Ok(blocks) => blocks.into_iter().map(Ok).collect(),
636                Err(e) => vec![Err(e)],
637            })
638        });
639
640    // Pin and box the stream for type erasure and safe async usage
641    Ok(Box::pin(flattened))
642}
643
644/// Stateful client for multi-turn conversations with automatic history management.
645///
646/// The `Client` is the primary interface for building conversational AI applications.
647/// It maintains conversation history, manages streaming responses, and provides two
648/// modes of operation: manual and automatic tool execution.
649///
650/// # State Management
651///
652/// The client maintains several pieces of state that persist across multiple turns:
653///
654/// - **Conversation History**: Complete record of all messages exchanged
655/// - **Active Stream**: Currently active SSE stream being consumed
656/// - **Interrupt Flag**: Thread-safe cancellation signal
657/// - **Auto-Execution Buffer**: Cached blocks for auto-execution mode
658///
659/// # Operating Modes
660///
661/// ## Manual Mode (default)
662///
663/// In manual mode, the client streams blocks directly to the caller. When the model
664/// requests a tool, you receive a `ToolUseBlock`, execute the tool yourself, add the
665/// result with `add_tool_result()`, and continue the conversation.
666///
667/// **Advantages**:
668/// - Full control over tool execution
669/// - Custom error handling per tool
670/// - Ability to modify tool inputs/outputs
671/// - Interactive debugging capabilities
672///
673/// ## Automatic Mode (`auto_execute_tools = true`)
674///
675/// In automatic mode, the client executes tools transparently and only returns the
676/// final text response after all tool iterations complete.
677///
678/// **Advantages**:
679/// - Simpler API for common use cases
680/// - Built-in retry logic via hooks
681/// - Automatic conversation continuation
682/// - Configurable iteration limits
683///
684/// # Thread Safety
685///
686/// The client is NOT thread-safe for concurrent use. However, the interrupt mechanism
687/// uses `Arc<AtomicBool>` which can be safely shared across threads to signal cancellation.
688///
689/// # Memory Management
690///
691/// - History grows unbounded by default (consider clearing periodically)
692/// - Streams are consumed lazily (low memory footprint during streaming)
693/// - Auto-execution buffers entire response (higher memory in auto mode)
694///
695/// # Examples
696///
697/// ## Basic Multi-Turn Conversation
698///
699/// ```rust,no_run
700/// use open_agent::{Client, AgentOptions, ContentBlock};
701///
702/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
703/// let mut client = Client::new(AgentOptions::builder()
704///     .model("gpt-4")
705///     .api_key("sk-...")
706///     .build()?)?;
707///
708/// // First question
709/// client.send("What's the capital of France?").await?;
710/// while let Some(block) = client.receive().await? {
711///     if let ContentBlock::Text(text) = block {
712///         println!("{}", text.text); // "Paris is the capital of France."
713///     }
714/// }
715///
716/// // Follow-up question - history is automatically maintained
717/// client.send("What's its population?").await?;
718/// while let Some(block) = client.receive().await? {
719///     if let ContentBlock::Text(text) = block {
720///         println!("{}", text.text); // "Paris has approximately 2.2 million people."
721///     }
722/// }
723/// # Ok(())
724/// # }
725/// ```
726///
727/// ## Manual Tool Execution
728///
729/// ```rust,no_run
730/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
731/// use serde_json::json;
732///
733/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
734/// let calculator = Tool::new(
735///     "calculator",
736///     "Performs arithmetic",
737///     json!({"type": "object"}),
738///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
739/// );
740///
741/// let mut client = Client::new(AgentOptions::builder()
742///     .model("gpt-4")
743///     .api_key("sk-...")
744///     .tools(vec![calculator])
745///     .build()?)?;
746///
747/// client.send("What's 2+2?").await?;
748///
749/// while let Some(block) = client.receive().await? {
750///     match block {
751///         ContentBlock::ToolUse(tool_use) => {
752///             // Execute tool manually
753///             let result = json!({"result": 4});
754///             client.add_tool_result(tool_use.id(), result)?;
755///
756///             // Continue conversation to get model's response
757///             client.send("").await?;
758///         }
759///         ContentBlock::Text(text) => {
760///             println!("{}", text.text); // "The result is 4."
761///         }
762///         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
763///     }
764/// }
765/// # Ok(())
766/// # }
767/// ```
768///
769/// ## Automatic Tool Execution
770///
771/// ```rust,no_run
772/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
773/// use serde_json::json;
774///
775/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
776/// let calculator = Tool::new(
777///     "calculator",
778///     "Performs arithmetic",
779///     json!({"type": "object"}),
780///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
781/// );
782///
783/// let mut client = Client::new(AgentOptions::builder()
784///     .model("gpt-4")
785///     .api_key("sk-...")
786///     .tools(vec![calculator])
787///     .auto_execute_tools(true)  // Enable auto-execution
788///     .build()?)?;
789///
790/// client.send("What's 2+2?").await?;
791///
792/// // Tools execute automatically - you only receive final text
793/// while let Some(block) = client.receive().await? {
794///     if let ContentBlock::Text(text) = block {
795///         println!("{}", text.text); // "The result is 4."
796///     }
797/// }
798/// # Ok(())
799/// # }
800/// ```
801///
802/// ## With Interruption
803///
804/// ```rust,no_run
805/// use open_agent::{Client, AgentOptions};
806/// use std::time::Duration;
807///
808/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
809/// let mut client = Client::new(AgentOptions::default())?;
810///
811/// // Start a long-running query
812/// client.send("Write a very long story").await?;
813///
814/// // Spawn a task to interrupt after timeout
815/// let interrupt_handle = client.interrupt_handle();
816/// tokio::spawn(async move {
817///     tokio::time::sleep(Duration::from_secs(5)).await;
818///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
819/// });
820///
821/// // This loop will stop when interrupted
822/// while let Some(block) = client.receive().await? {
823///     // Process blocks...
824/// }
825///
826/// // Client is still usable after interruption
827/// client.send("What's 2+2?").await?;
828/// # Ok(())
829/// # }
830/// ```
831pub struct Client {
832    /// Configuration options including model, API key, tools, hooks, etc.
833    ///
834    /// This field contains all the settings that control how the client behaves.
835    /// It's set once during construction and cannot be modified (though you can
836    /// access it via `options()` for inspection).
837    options: AgentOptions,
838
839    /// Complete conversation history as a sequence of messages.
840    ///
841    /// Each message contains a role (System/User/Assistant/Tool) and content blocks.
842    /// History grows unbounded by default - use `clear_history()` to reset.
843    ///
844    /// **Important**: The history includes ALL messages, not just user/assistant.
845    /// This includes tool results and intermediate assistant messages from tool calls.
846    history: Vec<Message>,
847
848    /// Currently active SSE stream being consumed.
849    ///
850    /// This is `Some(stream)` while a response is being received, and `None` when
851    /// no request is in flight or after a response completes.
852    ///
853    /// The stream is set by `send()` and consumed by `receive()`. When the stream
854    /// is exhausted, `receive()` returns `Ok(None)` and sets this back to `None`.
855    current_stream: Option<ContentStream>,
856
857    /// Reusable HTTP client for making API requests.
858    ///
859    /// Configured once during construction with the timeout from `AgentOptions`.
860    /// Reusing the same client across requests enables connection pooling and
861    /// better performance for multi-turn conversations.
862    http_client: reqwest::Client,
863
864    /// Thread-safe interrupt flag for cancellation.
865    ///
866    /// This `Arc<AtomicBool>` can be cloned and shared across threads or async tasks
867    /// to signal cancellation. When set to `true`, the next `receive()` call will
868    /// return `Ok(None)` and clear the current stream.
869    ///
870    /// The flag is automatically reset to `false` at the start of each `send()` call.
871    ///
872    /// **Thread Safety**: Can be safely accessed from multiple threads using atomic
873    /// operations. However, only one thread should call `send()`/`receive()`.
874    interrupted: Arc<AtomicBool>,
875
876    /// Buffer of content blocks for auto-execution mode.
877    ///
878    /// When `auto_execute_tools` is enabled, `receive()` internally calls the
879    /// auto-execution loop which buffers all final text blocks here. Subsequent
880    /// calls to `receive()` return blocks from this buffer one at a time.
881    ///
882    /// **Only used when `options.auto_execute_tools == true`**.
883    ///
884    /// The buffer is cleared when starting a new auto-execution loop.
885    auto_exec_buffer: Vec<ContentBlock>,
886
887    /// Current read position in the auto-execution buffer.
888    ///
889    /// Tracks which block to return next when `receive()` is called in auto mode.
890    /// Reset to 0 when the buffer is refilled with a new response.
891    ///
892    /// **Only used when `options.auto_execute_tools == true`**.
893    auto_exec_index: usize,
894
895    /// Accumulator for assistant response blocks in manual mode.
896    ///
897    /// In manual mode, `receive()` streams blocks one at a time to the caller.
898    /// This buffer collects those blocks so that when the stream ends, the
899    /// complete assistant message can be added to conversation history.
900    ///
901    /// **Only used when `options.auto_execute_tools == false`**.
902    manual_receive_buffer: Vec<ContentBlock>,
903}
904
905impl Client {
906    /// Creates a new client with the specified configuration.
907    ///
908    /// This constructor initializes all state fields and creates a reusable HTTP client
909    /// configured with the timeout from `AgentOptions`.
910    ///
911    /// # Parameters
912    ///
913    /// - `options`: Configuration including model, API key, tools, hooks, etc.
914    ///
915    /// # Errors
916    ///
917    /// Returns an error if the HTTP client cannot be built. This can happen due to:
918    /// - Invalid TLS configuration
919    /// - System resource exhaustion
920    /// - Invalid timeout values
921    ///
922    /// # Examples
923    ///
924    /// ```rust
925    /// use open_agent::{Client, AgentOptions};
926    ///
927    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
928    /// let client = Client::new(AgentOptions::builder()
929    ///     .model("gpt-4")
930    ///     .base_url("http://localhost:1234/v1")
931    ///     .build()?)?;
932    /// # Ok(())
933    /// # }
934    /// ```
935    pub fn new(options: AgentOptions) -> Result<Self> {
936        // Build HTTP client with configured timeout
937        // This client is reused across all requests for connection pooling
938        let http_client = reqwest::Client::builder()
939            .timeout(Duration::from_secs(options.timeout()))
940            .build()
941            .map_err(|e| Error::config(format!("Failed to build HTTP client: {}", e)))?;
942
943        Ok(Self {
944            options,
945            history: Vec::new(),  // Empty conversation history
946            current_stream: None, // No active stream yet
947            http_client,
948            interrupted: Arc::new(AtomicBool::new(false)), // Not interrupted initially
949            auto_exec_buffer: Vec::new(),                  // Empty buffer for auto mode
950            auto_exec_index: 0,                            // Start at beginning of buffer
951            manual_receive_buffer: Vec::new(),             // Empty buffer for manual mode
952        })
953    }
954
955    /// Sends a user message and initiates streaming of the model's response.
956    ///
957    /// This method performs several critical steps:
958    ///
959    /// 1. Executes UserPromptSubmit hooks (which can modify or block the prompt)
960    /// 2. Adds the user message to conversation history
961    /// 3. Builds and sends HTTP request to the OpenAI-compatible API
962    /// 4. Parses the SSE stream and sets up aggregation
963    /// 5. Stores the stream for consumption via `receive()`
964    ///
965    /// # Parameters
966    ///
967    /// - `prompt`: The user's message. Can be empty to continue conversation after
968    ///   adding tool results (common pattern in manual tool execution mode).
969    ///
970    /// # Returns
971    ///
972    /// - `Ok(())`: Request sent successfully, call `receive()` to get blocks
973    /// - `Err(e)`: Request failed (network error, API error, hook blocked, etc.)
974    ///
975    /// # Behavior Details
976    ///
977    /// ## Hook Execution
978    ///
979    /// Before sending, UserPromptSubmit hooks are executed. Hooks can:
980    /// - Modify the prompt text
981    /// - Block the request entirely
982    /// - Access conversation history
983    ///
984    /// If a hook blocks the request, this method returns an error immediately.
985    ///
986    /// ## History Management
987    ///
988    /// The prompt is added to history BEFORE sending the request. This ensures
989    /// that history is consistent even if the request fails.
990    ///
991    /// ## Stream Setup
992    ///
993    /// The response stream is set up but not consumed. You must call `receive()`
994    /// repeatedly to get content blocks. The stream remains active until:
995    /// - All blocks are consumed (stream naturally ends)
996    /// - An error occurs
997    /// - Interrupt is triggered
998    ///
999    /// ## Interrupt Handling
1000    ///
1001    /// The interrupt flag is reset to `false` at the start of this method,
1002    /// allowing a fresh request after a previous interruption.
1003    ///
1004    /// # State Changes
1005    ///
1006    /// - Resets `interrupted` flag to `false`
1007    /// - Appends user message to `history`
1008    /// - Sets `current_stream` to new SSE stream
1009    /// - Does NOT modify `auto_exec_buffer` or `auto_exec_index`
1010    ///
1011    /// # Errors
1012    ///
1013    /// Returns errors for:
1014    /// - Hook blocking the prompt
1015    /// - HTTP client errors (network failure, DNS, etc.)
1016    /// - API errors (auth failure, invalid model, rate limits)
1017    /// - Invalid response format
1018    ///
1019    /// After an error, the client remains usable for new requests.
1020    ///
1021    /// # Examples
1022    ///
1023    /// ## Basic Usage
1024    ///
1025    /// ```rust,no_run
1026    /// # use open_agent::{Client, AgentOptions};
1027    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1028    /// # let mut client = Client::new(AgentOptions::default())?;
1029    /// client.send("Hello!").await?;
1030    ///
1031    /// while let Some(block) = client.receive().await? {
1032    ///     // Process blocks...
1033    /// }
1034    /// # Ok(())
1035    /// # }
1036    /// ```
1037    ///
1038    /// ## Continuing After Tool Result
1039    ///
1040    /// ```rust,no_run
1041    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1042    /// # use serde_json::json;
1043    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1044    /// # let mut client = Client::new(AgentOptions::default())?;
1045    /// client.send("Use the calculator").await?;
1046    ///
1047    /// while let Some(block) = client.receive().await? {
1048    ///     if let ContentBlock::ToolUse(tool_use) = block {
1049    ///         // Execute tool and add result
1050    ///         client.add_tool_result(tool_use.id(), json!({"result": 42}))?;
1051    ///
1052    ///         // Continue conversation with empty prompt
1053    ///         client.send("").await?;
1054    ///     }
1055    /// }
1056    /// # Ok(())
1057    /// # }
1058    /// ```
1059    pub async fn send(&mut self, prompt: &str) -> Result<()> {
1060        use crate::hooks::UserPromptSubmitEvent;
1061
1062        // Reset interrupt flag for new query
1063        // This allows the client to be reused after a previous interruption
1064        // Uses SeqCst ordering to ensure visibility across all threads
1065        self.interrupted.store(false, Ordering::SeqCst);
1066
1067        // Discard any leftover manual-mode blocks from an abandoned stream.
1068        // If the prior stream completed normally, receive() already committed
1069        // the buffer to history on EOF. If the caller is calling send() before
1070        // the stream finished, the buffer is partial and must not be persisted.
1071        self.manual_receive_buffer.clear();
1072        self.current_stream = None;
1073
1074        // Execute UserPromptSubmit hooks
1075        // Hooks run BEFORE adding to history, allowing modification or blocking
1076        let mut final_prompt = prompt.to_string();
1077        let history_snapshot: Vec<serde_json::Value> = self
1078            .history
1079            .iter()
1080            .map(|_| serde_json::json!({})) // Simplified snapshot for hooks
1081            .collect();
1082
1083        // Create hook event with current prompt and history
1084        let event = UserPromptSubmitEvent::new(final_prompt.clone(), history_snapshot);
1085
1086        // Execute all registered UserPromptSubmit hooks
1087        if let Some(decision) = self.options.hooks().execute_user_prompt_submit(event).await {
1088            // Check if hook wants to block execution
1089            if !decision.continue_execution() {
1090                return Err(Error::other(format!(
1091                    "Prompt blocked by hook: {}",
1092                    decision.reason().unwrap_or("")
1093                )));
1094            }
1095            // Apply any prompt modifications from hooks
1096            if let Some(modified) = decision.modified_prompt() {
1097                final_prompt = modified.to_string();
1098            }
1099        }
1100
1101        // Add user message to history BEFORE sending request
1102        // This ensures history consistency even if request fails
1103        // Empty prompts are still added (needed for tool continuation)
1104        self.history.push(Message::user(final_prompt));
1105
1106        // Build messages array for API request
1107        // This includes system prompt + full conversation history
1108        let mut messages = Vec::new();
1109
1110        // Add system prompt as first message if configured
1111        // System prompts are added fresh for each request (not from history)
1112        if !self.options.system_prompt().is_empty() {
1113            messages.push(OpenAIMessage {
1114                role: "system".to_string(),
1115                content: Some(OpenAIContent::Text(
1116                    self.options.system_prompt().to_string(),
1117                )),
1118                tool_calls: None,
1119                tool_call_id: None,
1120            });
1121        }
1122
1123        // Convert conversation history to OpenAI message format
1124        // This includes user prompts, assistant responses, and tool results
1125        for msg in &self.history {
1126            // Separate blocks by type to determine message structure
1127            let mut text_blocks = Vec::new();
1128            let mut image_blocks = Vec::new();
1129            let mut tool_use_blocks = Vec::new();
1130            let mut tool_result_blocks = Vec::new();
1131
1132            for block in &msg.content {
1133                match block {
1134                    ContentBlock::Text(text) => text_blocks.push(text),
1135                    ContentBlock::Image(image) => image_blocks.push(image),
1136                    ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
1137                    ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
1138                }
1139            }
1140
1141            // Handle different message types based on content blocks
1142            // Case 1: Message contains tool results (should be separate tool messages)
1143            if !tool_result_blocks.is_empty() {
1144                for tool_result in tool_result_blocks {
1145                    // Serialize the tool result content as JSON string
1146                    let content =
1147                        serde_json::to_string(tool_result.content()).unwrap_or_else(|e| {
1148                            format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
1149                        });
1150
1151                    messages.push(OpenAIMessage {
1152                        role: "tool".to_string(),
1153                        content: Some(OpenAIContent::Text(content)),
1154                        tool_calls: None,
1155                        tool_call_id: Some(tool_result.tool_use_id().to_string()),
1156                    });
1157                }
1158            }
1159            // Case 2: Message contains tool use blocks (assistant with tool calls)
1160            else if !tool_use_blocks.is_empty() {
1161                // Build tool_calls array
1162                let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
1163                    .iter()
1164                    .map(|tool_use| {
1165                        // Serialize the input as a JSON string (OpenAI API requirement)
1166                        let arguments = serde_json::to_string(tool_use.input())
1167                            .unwrap_or_else(|_| "{}".to_string());
1168
1169                        OpenAIToolCall {
1170                            id: tool_use.id().to_string(),
1171                            call_type: "function".to_string(),
1172                            function: OpenAIFunction {
1173                                name: tool_use.name().to_string(),
1174                                arguments,
1175                            },
1176                        }
1177                    })
1178                    .collect();
1179
1180                // Extract any text content (some models include reasoning before tool calls)
1181                // Note: OpenAI API requires content field even if empty when tool_calls present
1182                let content = if !text_blocks.is_empty() {
1183                    let text = text_blocks
1184                        .iter()
1185                        .map(|t| t.text.as_str())
1186                        .collect::<Vec<_>>()
1187                        .join("\n");
1188                    Some(OpenAIContent::Text(text))
1189                } else {
1190                    // Empty string satisfies OpenAI API schema (content is required)
1191                    Some(OpenAIContent::Text(String::new()))
1192                };
1193
1194                messages.push(OpenAIMessage {
1195                    role: "assistant".to_string(),
1196                    content,
1197                    tool_calls: Some(tool_calls),
1198                    tool_call_id: None,
1199                });
1200            }
1201            // Case 3: Message contains images (use OpenAIContent::Parts)
1202            else if !image_blocks.is_empty() {
1203                // Log debug info about images being serialized
1204                log::debug!(
1205                    "Serializing message with {} image(s) for {:?} role",
1206                    image_blocks.len(),
1207                    msg.role
1208                );
1209
1210                // Build content parts array preserving original order
1211                let mut content_parts = Vec::new();
1212
1213                // Re-iterate through content blocks to maintain order
1214                for block in &msg.content {
1215                    match block {
1216                        ContentBlock::Text(text) => {
1217                            content_parts.push(OpenAIContentPart::text(&text.text));
1218                        }
1219                        ContentBlock::Image(image) => {
1220                            // Log image details (truncate URL for privacy)
1221                            let url_display = if image.url().len() > 100 {
1222                                format!("{}... ({} chars)", &image.url()[..100], image.url().len())
1223                            } else {
1224                                image.url().to_string()
1225                            };
1226                            let detail_str = match image.detail() {
1227                                crate::types::ImageDetail::Low => "low",
1228                                crate::types::ImageDetail::High => "high",
1229                                crate::types::ImageDetail::Auto => "auto",
1230                            };
1231                            log::debug!("  - Image: {} (detail: {})", url_display, detail_str);
1232
1233                            content_parts.push(OpenAIContentPart::from_image(image));
1234                        }
1235                        ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
1236                    }
1237                }
1238
1239                // Defensive check: content_parts should never be empty at this point
1240                // If it is, it indicates a logic error (e.g., all blocks were filtered out)
1241                if content_parts.is_empty() {
1242                    return Err(Error::other(
1243                        "Internal error: Message with images produced empty content array",
1244                    ));
1245                }
1246
1247                let role_str = match msg.role {
1248                    MessageRole::System => "system",
1249                    MessageRole::User => "user",
1250                    MessageRole::Assistant => "assistant",
1251                    MessageRole::Tool => "tool",
1252                };
1253
1254                messages.push(OpenAIMessage {
1255                    role: role_str.to_string(),
1256                    content: Some(OpenAIContent::Parts(content_parts)),
1257                    tool_calls: None,
1258                    tool_call_id: None,
1259                });
1260            }
1261            // Case 4: Message contains only text (normal message, backward compatible)
1262            else {
1263                let content = text_blocks
1264                    .iter()
1265                    .map(|t| t.text.as_str())
1266                    .collect::<Vec<_>>()
1267                    .join("\n");
1268
1269                let role_str = match msg.role {
1270                    MessageRole::System => "system",
1271                    MessageRole::User => "user",
1272                    MessageRole::Assistant => "assistant",
1273                    MessageRole::Tool => "tool",
1274                };
1275
1276                messages.push(OpenAIMessage {
1277                    role: role_str.to_string(),
1278                    content: Some(OpenAIContent::Text(content)),
1279                    tool_calls: None,
1280                    tool_call_id: None,
1281                });
1282            }
1283        }
1284
1285        // Convert tools to OpenAI format if any are registered
1286        // Each tool is described with name, description, and JSON Schema parameters
1287        let tools = if !self.options.tools().is_empty() {
1288            Some(
1289                self.options
1290                    .tools()
1291                    .iter()
1292                    .map(|t| t.to_openai_format())
1293                    .collect(),
1294            )
1295        } else {
1296            None
1297        };
1298
1299        // Build the OpenAI-compatible request payload
1300        let request = OpenAIRequest {
1301            model: self.options.model().to_string(),
1302            messages,
1303            stream: true, // Always stream for progressive rendering
1304            max_tokens: self.options.max_tokens(),
1305            temperature: Some(self.options.temperature()),
1306            tools,
1307        };
1308
1309        // Make HTTP POST request to chat completions endpoint
1310        let url = format!("{}/chat/completions", self.options.base_url());
1311        let response = self
1312            .http_client
1313            .post(&url)
1314            .header(
1315                "Authorization",
1316                format!("Bearer {}", self.options.api_key()),
1317            )
1318            .header("Content-Type", "application/json")
1319            .json(&request)
1320            .send()
1321            .await
1322            .map_err(Error::Http)?;
1323
1324        // Check for HTTP-level errors before processing stream
1325        // This catches authentication, rate limits, invalid models, etc.
1326        if !response.status().is_success() {
1327            let status = response.status();
1328            let body = response.text().await.unwrap_or_else(|e| {
1329                eprintln!("WARNING: Failed to read error response body: {}", e);
1330                "Unknown error (failed to read response body)".to_string()
1331            });
1332            return Err(Error::api(format!("API error {}: {}", status, body)));
1333        }
1334
1335        // Parse Server-Sent Events (SSE) stream from response
1336        let sse_stream = parse_sse_stream(response);
1337
1338        // Aggregate SSE chunks into complete content blocks
1339        // ToolCallAggregator maintains state to handle incremental JSON chunks
1340        // that may arrive split across multiple SSE events
1341        let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
1342            let result = match chunk_result {
1343                Ok(chunk) => match aggregator.process_chunk(chunk) {
1344                    Ok(blocks) => {
1345                        if blocks.is_empty() {
1346                            Some(None) // Partial chunk, keep aggregating
1347                        } else {
1348                            Some(Some(Ok(blocks))) // Complete block(s) ready
1349                        }
1350                    }
1351                    Err(e) => Some(Some(Err(e))), // Processing error
1352                },
1353                Err(e) => Some(Some(Err(e))), // Stream error
1354            };
1355            futures::future::ready(result)
1356        });
1357
1358        // Flatten the stream to emit individual blocks
1359        // filter_map removes None values (partial chunks)
1360        // flat_map converts Vec<ContentBlock> to individual items
1361        let flattened = stream
1362            .filter_map(|item| async move { item })
1363            .flat_map(|result| {
1364                futures::stream::iter(match result {
1365                    Ok(blocks) => blocks.into_iter().map(Ok).collect(),
1366                    Err(e) => vec![Err(e)],
1367                })
1368            });
1369
1370        // Store the stream for consumption via receive()
1371        // The stream is NOT consumed here - that happens in receive()
1372        self.current_stream = Some(Box::pin(flattened));
1373
1374        Ok(())
1375    }
1376
1377    /// Internal method that returns one block from the current stream.
1378    ///
1379    /// This is the core streaming logic extracted for reuse by both manual mode
1380    /// and auto-execution mode. It handles interrupt checking and stream consumption.
1381    ///
1382    /// # Returns
1383    ///
1384    /// - `Ok(Some(block))`: Successfully received a content block
1385    /// - `Ok(None)`: Stream ended naturally or was interrupted
1386    /// - `Err(e)`: An error occurred during streaming
1387    ///
1388    /// # State Changes
1389    ///
1390    /// - Sets `current_stream` to `None` if interrupted or stream ends
1391    /// - Does not modify history or other state
1392    ///
1393    /// # Implementation Notes
1394    ///
1395    /// This method checks the interrupt flag on every call, allowing responsive
1396    /// cancellation. The check uses SeqCst ordering for immediate visibility of
1397    /// interrupts from other threads.
1398    async fn receive_one(&mut self) -> Result<Option<ContentBlock>> {
1399        // Check interrupt flag before attempting to receive
1400        // Uses SeqCst to ensure we see the latest value from any thread
1401        if self.interrupted.load(Ordering::SeqCst) {
1402            // Return None but leave current_stream intact so callers can
1403            // distinguish "interrupted a live stream" (current_stream is Some)
1404            // from "interrupt after stream already ended" (current_stream is None).
1405            return Ok(None);
1406        }
1407
1408        // Poll the current stream if one exists
1409        if let Some(stream) = &mut self.current_stream {
1410            match stream.next().await {
1411                Some(Ok(block)) => Ok(Some(block)),
1412                Some(Err(e)) => Err(e),
1413                None => {
1414                    // Natural EOF — mark stream as fully consumed
1415                    self.current_stream = None;
1416                    Ok(None)
1417                }
1418            }
1419        } else {
1420            // No active stream
1421            Ok(None)
1422        }
1423    }
1424
1425    /// Collects all blocks from the current stream into a vector.
1426    ///
1427    /// Internal helper for auto-execution mode. This method buffers the entire
1428    /// response in memory, which is necessary to determine if the response contains
1429    /// tool calls before returning anything to the caller.
1430    ///
1431    /// # Returns
1432    ///
1433    /// - `Ok(vec)`: Successfully collected all blocks
1434    /// - `Err(e)`: Error during collection or interrupted
1435    ///
1436    /// # Memory Usage
1437    ///
1438    /// This buffers the entire response, which can be large for long completions.
1439    /// Consider the memory implications when using auto-execution mode.
1440    ///
1441    /// # Interruption
1442    ///
1443    /// Checks interrupt flag during collection and returns error if interrupted.
1444    async fn collect_all_blocks(&mut self) -> Result<Vec<ContentBlock>> {
1445        let mut blocks = Vec::new();
1446
1447        // Consume entire stream into vector
1448        while let Some(block) = self.receive_one().await? {
1449            // Check interrupt during collection for responsiveness
1450            if self.interrupted.load(Ordering::SeqCst) {
1451                self.current_stream = None;
1452                return Err(Error::other(
1453                    "Operation interrupted during block collection",
1454                ));
1455            }
1456
1457            blocks.push(block);
1458        }
1459
1460        Ok(blocks)
1461    }
1462
1463    /// Executes a tool by name with the given input.
1464    ///
1465    /// Internal helper for auto-execution mode. Looks up the tool in the registered
1466    /// tools list and executes it with the provided input.
1467    ///
1468    /// # Parameters
1469    ///
1470    /// - `tool_name`: Name of the tool to execute
1471    /// - `input`: JSON value containing tool parameters
1472    ///
1473    /// # Returns
1474    ///
1475    /// - `Ok(result)`: Tool executed successfully, returns result as JSON
1476    /// - `Err(e)`: Tool not found or execution failed
1477    ///
1478    /// # Error Handling
1479    ///
1480    /// If the tool is not found in the registry, returns a ToolError.
1481    /// If execution fails, the error from the tool is propagated.
1482    async fn execute_tool_internal(
1483        &self,
1484        tool_name: &str,
1485        input: serde_json::Value,
1486    ) -> Result<serde_json::Value> {
1487        // Find tool in registered tools by name
1488        let tool = self
1489            .options
1490            .tools()
1491            .iter()
1492            .find(|t| t.name() == tool_name)
1493            .ok_or_else(|| Error::tool(format!("Tool '{}' not found", tool_name)))?;
1494
1495        // Execute the tool's async function
1496        tool.execute(input).await
1497    }
1498
1499    /// Auto-execution loop that handles tool calls automatically.
1500    ///
1501    /// This is the core implementation of automatic tool execution mode. It:
1502    ///
1503    /// 1. Collects all blocks from the current stream
1504    /// 2. Separates text blocks from tool use blocks
1505    /// 3. If there are tool blocks:
1506    ///    - Executes PreToolUse hooks (can modify/block)
1507    ///    - Executes each tool via its registered function
1508    ///    - Executes PostToolUse hooks (can modify result)
1509    ///    - Adds results to history
1510    ///    - Continues conversation with send("")
1511    /// 4. Repeats until text-only response or max iterations
1512    /// 5. Returns all final text blocks
1513    ///
1514    /// # Returns
1515    ///
1516    /// - `Ok(blocks)`: Final text blocks after all tool iterations
1517    /// - `Err(e)`: Error during execution, stream processing, or interruption
1518    ///
1519    /// # Iteration Limit
1520    ///
1521    /// The loop is bounded by `options.max_tool_iterations` to prevent infinite loops.
1522    /// When the limit is reached, the loop stops and returns whatever text blocks
1523    /// have been collected so far.
1524    ///
1525    /// # Hook Integration
1526    ///
1527    /// Hooks are executed for each tool call:
1528    /// - **PreToolUse**: Can modify input or block execution entirely
1529    /// - **PostToolUse**: Can modify the result before it's added to history
1530    ///
1531    /// If a hook blocks execution, a JSON error response is used as the tool result.
1532    ///
1533    /// # State Management
1534    ///
1535    /// The loop maintains history by adding:
1536    /// - Assistant messages with text + tool use blocks
1537    /// - User messages with tool result blocks
1538    ///
1539    /// This creates a proper conversation flow that the model can follow.
1540    ///
1541    /// # Error Recovery
1542    ///
1543    /// If a tool execution fails, the error is converted to a JSON error response
1544    /// and added as the tool result. This allows the conversation to continue
1545    /// and lets the model handle the error.
1546    async fn auto_execute_loop(&mut self) -> Result<Vec<ContentBlock>> {
1547        use crate::types::ToolResultBlock;
1548
1549        // Track iterations to prevent infinite loops
1550        let mut iteration = 0;
1551        let max_iterations = self.options.max_tool_iterations();
1552
1553        loop {
1554            // ========================================================================
1555            // STEP 1: Collect all blocks from current stream
1556            // ========================================================================
1557            // Buffer the entire response to determine if it contains tool calls
1558            let blocks = self.collect_all_blocks().await?;
1559
1560            // Empty response means stream ended or was interrupted
1561            if blocks.is_empty() {
1562                return Ok(Vec::new());
1563            }
1564
1565            // ========================================================================
1566            // STEP 2: Separate text blocks from tool use blocks
1567            // ========================================================================
1568            // The model can return a mix of text and tool calls in one response
1569            let mut text_blocks = Vec::new();
1570            let mut tool_blocks = Vec::new();
1571
1572            for block in blocks {
1573                match block {
1574                    ContentBlock::Text(_) => text_blocks.push(block),
1575                    ContentBlock::ToolUse(_) => tool_blocks.push(block),
1576                    ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {} // Ignore ToolResult and Image variants
1577                }
1578            }
1579
1580            // ========================================================================
1581            // STEP 3: Check if we're done (no tool calls)
1582            // ========================================================================
1583            // If the response contains no tool calls, we've reached the final answer
1584            if tool_blocks.is_empty() {
1585                // Add assistant's final text response to history
1586                if !text_blocks.is_empty() {
1587                    let assistant_msg = Message::assistant(text_blocks.clone());
1588                    self.history.push(assistant_msg);
1589                }
1590                // Return text blocks to caller via buffered receive()
1591                return Ok(text_blocks);
1592            }
1593
1594            // ========================================================================
1595            // STEP 4: Check iteration limit BEFORE executing tools
1596            // ========================================================================
1597            // Increment counter and check if we've hit the max
1598            iteration += 1;
1599            if iteration > max_iterations {
1600                // Max iterations reached - stop execution and return what we have
1601                // This prevents infinite tool-calling loops
1602                if !text_blocks.is_empty() {
1603                    let assistant_msg = Message::assistant(text_blocks.clone());
1604                    self.history.push(assistant_msg);
1605                }
1606                return Ok(text_blocks);
1607            }
1608
1609            // ========================================================================
1610            // STEP 5: Add assistant message to history
1611            // ========================================================================
1612            // The assistant message includes BOTH text and tool use blocks
1613            // This preserves the full context for future turns
1614            let mut all_blocks = text_blocks.clone();
1615            all_blocks.extend(tool_blocks.clone());
1616            let assistant_msg = Message::assistant(all_blocks);
1617            self.history.push(assistant_msg);
1618
1619            // ========================================================================
1620            // STEP 6: Execute all tools and collect results
1621            // ========================================================================
1622            for block in tool_blocks {
1623                if let ContentBlock::ToolUse(tool_use) = block {
1624                    // Create simplified history snapshot for hooks
1625                    // TODO: Full serialization of history for hooks
1626                    let history_snapshot: Vec<serde_json::Value> =
1627                        self.history.iter().map(|_| serde_json::json!({})).collect();
1628
1629                    // ============================================================
1630                    // Execute PreToolUse hooks
1631                    // ============================================================
1632                    use crate::hooks::PreToolUseEvent;
1633                    let pre_event = PreToolUseEvent::new(
1634                        tool_use.name().to_string(),
1635                        tool_use.input().clone(),
1636                        tool_use.id().to_string(),
1637                        history_snapshot.clone(),
1638                    );
1639
1640                    // Track whether to execute and what input to use
1641                    let mut tool_input = tool_use.input().clone();
1642                    let mut should_execute = true;
1643                    let mut block_reason = None;
1644
1645                    // Execute all PreToolUse hooks
1646                    if let Some(decision) =
1647                        self.options.hooks().execute_pre_tool_use(pre_event).await
1648                    {
1649                        if !decision.continue_execution() {
1650                            // Hook blocked execution
1651                            should_execute = false;
1652                            block_reason = decision.reason().map(|s| s.to_string());
1653                        } else if let Some(modified) = decision.modified_input() {
1654                            // Hook modified the input
1655                            tool_input = modified.clone();
1656                        }
1657                    }
1658
1659                    // ============================================================
1660                    // Execute tool (or create error result if blocked)
1661                    // ============================================================
1662                    let result = if should_execute {
1663                        // Actually execute the tool
1664                        match self
1665                            .execute_tool_internal(tool_use.name(), tool_input.clone())
1666                            .await
1667                        {
1668                            Ok(res) => res, // Success - use the result
1669                            Err(e) => {
1670                                // Tool execution failed - convert to JSON error
1671                                // This allows the conversation to continue
1672                                serde_json::json!({
1673                                    "error": e.to_string(),
1674                                    "tool": tool_use.name(),
1675                                    "id": tool_use.id()
1676                                })
1677                            }
1678                        }
1679                    } else {
1680                        // Tool blocked by PreToolUse hook - create error result
1681                        serde_json::json!({
1682                            "error": "Tool execution blocked by hook",
1683                            "reason": block_reason.unwrap_or_else(|| "No reason provided".to_string()),
1684                            "tool": tool_use.name(),
1685                            "id": tool_use.id()
1686                        })
1687                    };
1688
1689                    // ============================================================
1690                    // Execute PostToolUse hooks
1691                    // ============================================================
1692                    use crate::hooks::PostToolUseEvent;
1693                    let post_event = PostToolUseEvent::new(
1694                        tool_use.name().to_string(),
1695                        tool_input,
1696                        tool_use.id().to_string(),
1697                        result.clone(),
1698                        history_snapshot,
1699                    );
1700
1701                    let mut final_result = result;
1702                    if let Some(decision) =
1703                        self.options.hooks().execute_post_tool_use(post_event).await
1704                    {
1705                        // PostToolUse can modify the result
1706                        // Note: Uses modified_input field (naming is historical)
1707                        if let Some(modified) = decision.modified_input() {
1708                            final_result = modified.clone();
1709                        }
1710                    }
1711
1712                    // ============================================================
1713                    // Add tool result to history
1714                    // ============================================================
1715                    // Tool results are added as user messages (per OpenAI convention)
1716                    let tool_result = ToolResultBlock::new(tool_use.id(), final_result);
1717                    let tool_result_msg =
1718                        Message::user_with_blocks(vec![ContentBlock::ToolResult(tool_result)]);
1719                    self.history.push(tool_result_msg);
1720                }
1721            }
1722
1723            // ========================================================================
1724            // STEP 7: Continue conversation to get next response
1725            // ========================================================================
1726            // Send empty string to continue - the history contains all context
1727            self.send("").await?;
1728
1729            // Loop continues to collect and process the next response
1730            // This will either be more tool calls or the final text answer
1731        }
1732    }
1733
1734    /// Receives the next content block from the current stream.
1735    ///
1736    /// This is the primary method for consuming responses from the model. It works
1737    /// differently depending on the operating mode:
1738    ///
1739    /// ## Manual Mode (default)
1740    ///
1741    /// Streams blocks directly from the API response as they arrive. You receive:
1742    /// - `TextBlock`: Incremental text from the model
1743    /// - `ToolUseBlock`: Requests to execute tools
1744    /// - Other block types as they're emitted
1745    ///
1746    /// When you receive a `ToolUseBlock`, you must:
1747    /// 1. Execute the tool yourself
1748    /// 2. Call `add_tool_result()` with the result
1749    /// 3. Call `send("")` to continue the conversation
1750    ///
1751    /// ## Automatic Mode (`auto_execute_tools = true`)
1752    ///
1753    /// Transparently executes tools and only returns final text blocks. The first
1754    /// call to `receive()` triggers the auto-execution loop which:
1755    /// 1. Collects all blocks from the stream
1756    /// 2. Executes any tool calls automatically
1757    /// 3. Continues the conversation until reaching a text-only response
1758    /// 4. Buffers the final text blocks
1759    /// 5. Returns them one at a time on subsequent `receive()` calls
1760    ///
1761    /// # Returns
1762    ///
1763    /// - `Ok(Some(block))`: Successfully received a content block
1764    /// - `Ok(None)`: Stream ended normally or was interrupted
1765    /// - `Err(e)`: An error occurred during streaming or tool execution
1766    ///
1767    /// # Behavior Details
1768    ///
1769    /// ## Interruption
1770    ///
1771    /// Checks the interrupt flag on every call. If interrupted, immediately returns
1772    /// `Ok(None)` and clears the stream. The client can be reused after interruption.
1773    ///
1774    /// ## Stream Lifecycle
1775    ///
1776    /// 1. After `send()`, stream is active
1777    /// 2. Each `receive()` call yields one block
1778    /// 3. When stream ends, returns `Ok(None)`
1779    /// 4. Subsequent calls continue returning `Ok(None)` until next `send()`
1780    ///
1781    /// ## Auto-Execution Buffer
1782    ///
1783    /// In auto mode, blocks are buffered in memory. The buffer persists until
1784    /// fully consumed (index reaches length), at which point it's cleared.
1785    ///
1786    /// # State Changes
1787    ///
1788    /// - Advances stream position
1789    /// - In auto mode: May trigger entire execution loop and modify history
1790    /// - In manual mode: Only reads from stream, no history changes
1791    /// - Increments `auto_exec_index` when returning buffered blocks
1792    ///
1793    /// # Examples
1794    ///
1795    /// ## Manual Mode - Basic
1796    ///
1797    /// ```rust,no_run
1798    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1799    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1800    /// # let mut client = Client::new(AgentOptions::default())?;
1801    /// client.send("Hello!").await?;
1802    ///
1803    /// while let Some(block) = client.receive().await? {
1804    ///     match block {
1805    ///         ContentBlock::Text(text) => print!("{}", text.text),
1806    ///         ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
1807    ///     }
1808    /// }
1809    /// # Ok(())
1810    /// # }
1811    /// ```
1812    ///
1813    /// ## Manual Mode - With Tools
1814    ///
1815    /// ```rust,no_run
1816    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1817    /// # use serde_json::json;
1818    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1819    /// # let mut client = Client::new(AgentOptions::default())?;
1820    /// client.send("Use the calculator").await?;
1821    ///
1822    /// while let Some(block) = client.receive().await? {
1823    ///     match block {
1824    ///         ContentBlock::Text(text) => {
1825    ///             println!("{}", text.text);
1826    ///         }
1827    ///         ContentBlock::ToolUse(tool_use) => {
1828    ///             println!("Executing: {}", tool_use.name());
1829    ///
1830    ///             // Execute tool manually
1831    ///             let result = json!({"result": 42});
1832    ///
1833    ///             // Add result and continue
1834    ///             client.add_tool_result(tool_use.id(), result)?;
1835    ///             client.send("").await?;
1836    ///         }
1837    ///         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
1838    ///     }
1839    /// }
1840    /// # Ok(())
1841    /// # }
1842    /// ```
1843    ///
1844    /// ## Auto Mode
1845    ///
1846    /// ```rust,no_run
1847    /// # use open_agent::{Client, AgentOptions, ContentBlock, Tool};
1848    /// # use serde_json::json;
1849    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1850    /// let mut client = Client::new(AgentOptions::builder()
1851    ///     .auto_execute_tools(true)
1852    ///     .build()?)?;
1853    ///
1854    /// client.send("Calculate 2+2").await?;
1855    ///
1856    /// // Tools execute automatically - you only get final text
1857    /// while let Some(block) = client.receive().await? {
1858    ///     if let ContentBlock::Text(text) = block {
1859    ///         println!("{}", text.text);
1860    ///     }
1861    /// }
1862    /// # Ok(())
1863    /// # }
1864    /// ```
1865    ///
1866    /// ## With Error Handling
1867    ///
1868    /// ```rust,no_run
1869    /// # use open_agent::{Client, AgentOptions};
1870    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1871    /// # let mut client = Client::new(AgentOptions::default())?;
1872    /// client.send("Hello").await?;
1873    ///
1874    /// loop {
1875    ///     match client.receive().await {
1876    ///         Ok(Some(block)) => {
1877    ///             // Process block
1878    ///         }
1879    ///         Ok(None) => {
1880    ///             // Stream ended
1881    ///             break;
1882    ///         }
1883    ///         Err(e) => {
1884    ///             eprintln!("Error: {}", e);
1885    ///             break;
1886    ///         }
1887    ///     }
1888    /// }
1889    /// # Ok(())
1890    /// # }
1891    /// ```
1892    ///
1893    /// Sends a pre-built message to the AI model.
1894    ///
1895    /// This method allows sending messages with images or custom content blocks
1896    /// that cannot be expressed as simple text prompts. Use the `Message` helper
1897    /// methods like [`user_with_image()`](Message::user_with_image),
1898    /// [`user_with_image_detail()`](Message::user_with_image_detail), or
1899    /// [`user_with_base64_image()`](Message::user_with_base64_image) to create
1900    /// messages with multimodal content.
1901    ///
1902    /// Unlike [`send()`](Client::send), this method:
1903    /// - Accepts pre-built `Message` objects instead of text prompts
1904    /// - Bypasses `UserPromptSubmit` hooks (since message is already constructed)
1905    /// - Enables multimodal interactions (text + images)
1906    ///
1907    /// After calling this method, use [`receive()`](Client::receive) to get the
1908    /// response content blocks.
1909    ///
1910    /// # Arguments
1911    ///
1912    /// * `message` - A pre-built message (typically created with `Message::user_with_image()` or similar helpers)
1913    ///
1914    /// # Errors
1915    ///
1916    /// Returns `Error` if:
1917    /// - Network request fails
1918    /// - Server returns an error
1919    /// - Response cannot be parsed
1920    /// - Request is interrupted via [`interrupt()`](Client::interrupt)
1921    ///
1922    /// # Example
1923    ///
1924    /// ```rust,no_run
1925    /// use open_agent::{Client, AgentOptions, Message, ImageDetail};
1926    ///
1927    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1928    /// let options = AgentOptions::builder()
1929    ///     .model("gpt-4-vision-preview")
1930    ///     .base_url("http://localhost:1234/v1")
1931    ///     .build()?;
1932    ///
1933    /// let mut client = Client::new(options)?;
1934    ///
1935    /// // Send a message with an image
1936    /// let msg = Message::user_with_image(
1937    ///     "What's in this image?",
1938    ///     "https://example.com/photo.jpg"
1939    /// )?;
1940    /// client.send_message(msg).await?;
1941    ///
1942    /// // Receive the response
1943    /// while let Some(block) = client.receive().await? {
1944    ///     // Process response blocks
1945    /// }
1946    /// # Ok(())
1947    /// # }
1948    /// ```
1949    pub async fn send_message(&mut self, message: Message) -> Result<()> {
1950        // Reset interrupt flag for new query
1951        // This allows the client to be reused after a previous interruption
1952        // Uses SeqCst ordering to ensure visibility across all threads
1953        self.interrupted.store(false, Ordering::SeqCst);
1954
1955        // Discard any leftover manual-mode blocks from an abandoned stream.
1956        self.manual_receive_buffer.clear();
1957        self.current_stream = None;
1958
1959        // Note: We do NOT run UserPromptSubmit hooks here because:
1960        // 1. The message is already fully constructed
1961        // 2. Hooks expect string prompts, not complex Message objects
1962        // 3. For multimodal messages, there's no single "prompt" to modify
1963
1964        // Add message to history BEFORE sending request
1965        // This ensures history consistency even if request fails
1966        self.history.push(message);
1967
1968        // The rest of the logic is identical to send() - build and execute request
1969        // Build messages array for API request
1970        // This includes system prompt + full conversation history
1971        let mut messages = Vec::new();
1972
1973        // Add system prompt as first message if configured
1974        // System prompts are added fresh for each request (not from history)
1975        if !self.options.system_prompt().is_empty() {
1976            messages.push(OpenAIMessage {
1977                role: "system".to_string(),
1978                content: Some(OpenAIContent::Text(
1979                    self.options.system_prompt().to_string(),
1980                )),
1981                tool_calls: None,
1982                tool_call_id: None,
1983            });
1984        }
1985
1986        // Convert conversation history to OpenAI message format
1987        // This includes user prompts, assistant responses, and tool results
1988        for msg in &self.history {
1989            // Separate blocks by type to determine message structure
1990            let mut text_blocks = Vec::new();
1991            let mut image_blocks = Vec::new();
1992            let mut tool_use_blocks = Vec::new();
1993            let mut tool_result_blocks = Vec::new();
1994
1995            for block in &msg.content {
1996                match block {
1997                    ContentBlock::Text(text) => text_blocks.push(text),
1998                    ContentBlock::Image(image) => image_blocks.push(image),
1999                    ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
2000                    ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
2001                }
2002            }
2003
2004            // Handle different message types based on content blocks
2005            // Case 1: Message contains tool results (should be separate tool messages)
2006            if !tool_result_blocks.is_empty() {
2007                for tool_result in tool_result_blocks {
2008                    // Serialize the tool result content as JSON string
2009                    let content =
2010                        serde_json::to_string(tool_result.content()).unwrap_or_else(|e| {
2011                            format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
2012                        });
2013
2014                    messages.push(OpenAIMessage {
2015                        role: "tool".to_string(),
2016                        content: Some(OpenAIContent::Text(content)),
2017                        tool_calls: None,
2018                        tool_call_id: Some(tool_result.tool_use_id().to_string()),
2019                    });
2020                }
2021            }
2022            // Case 2: Message contains tool use blocks (assistant with tool calls)
2023            else if !tool_use_blocks.is_empty() {
2024                // Build tool_calls array
2025                let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
2026                    .iter()
2027                    .map(|tool_use| {
2028                        // Serialize the input as a JSON string (OpenAI API requirement)
2029                        let arguments = serde_json::to_string(tool_use.input())
2030                            .unwrap_or_else(|_| "{}".to_string());
2031
2032                        OpenAIToolCall {
2033                            id: tool_use.id().to_string(),
2034                            call_type: "function".to_string(),
2035                            function: OpenAIFunction {
2036                                name: tool_use.name().to_string(),
2037                                arguments,
2038                            },
2039                        }
2040                    })
2041                    .collect();
2042
2043                // Extract any text content (some models include reasoning before tool calls)
2044                // Note: OpenAI API requires content field even if empty when tool_calls present
2045                let content = if !text_blocks.is_empty() {
2046                    let text = text_blocks
2047                        .iter()
2048                        .map(|t| t.text.as_str())
2049                        .collect::<Vec<_>>()
2050                        .join("\n");
2051                    Some(OpenAIContent::Text(text))
2052                } else {
2053                    // Empty string satisfies OpenAI API schema (content is required)
2054                    Some(OpenAIContent::Text(String::new()))
2055                };
2056
2057                messages.push(OpenAIMessage {
2058                    role: "assistant".to_string(),
2059                    content,
2060                    tool_calls: Some(tool_calls),
2061                    tool_call_id: None,
2062                });
2063            }
2064            // Case 3: Message contains images (use OpenAIContent::Parts)
2065            else if !image_blocks.is_empty() {
2066                // Log debug info about images being serialized
2067                log::debug!(
2068                    "Serializing message with {} image(s) for {:?} role",
2069                    image_blocks.len(),
2070                    msg.role
2071                );
2072
2073                // Build content parts array preserving original order
2074                let mut content_parts = Vec::new();
2075
2076                // Re-iterate through content blocks to maintain order
2077                for block in &msg.content {
2078                    match block {
2079                        ContentBlock::Text(text) => {
2080                            content_parts.push(OpenAIContentPart::text(&text.text));
2081                        }
2082                        ContentBlock::Image(image) => {
2083                            // Log image details (truncate URL for privacy)
2084                            let url_display = if image.url().len() > 100 {
2085                                format!("{}... ({} chars)", &image.url()[..100], image.url().len())
2086                            } else {
2087                                image.url().to_string()
2088                            };
2089                            let detail_str = match image.detail() {
2090                                crate::types::ImageDetail::Low => "low",
2091                                crate::types::ImageDetail::High => "high",
2092                                crate::types::ImageDetail::Auto => "auto",
2093                            };
2094                            log::debug!("  - Image: {} (detail: {})", url_display, detail_str);
2095
2096                            content_parts.push(OpenAIContentPart::from_image(image));
2097                        }
2098                        ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
2099                    }
2100                }
2101
2102                // Defensive check: content_parts should never be empty at this point
2103                // If it is, it indicates a logic error (e.g., all blocks were filtered out)
2104                if content_parts.is_empty() {
2105                    return Err(Error::other(
2106                        "Internal error: Message with images produced empty content array",
2107                    ));
2108                }
2109
2110                let role_str = match msg.role {
2111                    MessageRole::System => "system",
2112                    MessageRole::User => "user",
2113                    MessageRole::Assistant => "assistant",
2114                    MessageRole::Tool => "tool",
2115                };
2116
2117                messages.push(OpenAIMessage {
2118                    role: role_str.to_string(),
2119                    content: Some(OpenAIContent::Parts(content_parts)),
2120                    tool_calls: None,
2121                    tool_call_id: None,
2122                });
2123            }
2124            // Case 4: Message contains only text (normal message, backward compatible)
2125            else {
2126                let content = text_blocks
2127                    .iter()
2128                    .map(|t| t.text.as_str())
2129                    .collect::<Vec<_>>()
2130                    .join("\n");
2131
2132                let role_str = match msg.role {
2133                    MessageRole::System => "system",
2134                    MessageRole::User => "user",
2135                    MessageRole::Assistant => "assistant",
2136                    MessageRole::Tool => "tool",
2137                };
2138
2139                messages.push(OpenAIMessage {
2140                    role: role_str.to_string(),
2141                    content: Some(OpenAIContent::Text(content)),
2142                    tool_calls: None,
2143                    tool_call_id: None,
2144                });
2145            }
2146        }
2147
2148        // Convert tools to OpenAI format if any are registered
2149        let tools = if !self.options.tools().is_empty() {
2150            Some(
2151                self.options
2152                    .tools()
2153                    .iter()
2154                    .map(|t| t.to_openai_format())
2155                    .collect(),
2156            )
2157        } else {
2158            None
2159        };
2160
2161        // Build the OpenAI-compatible request payload
2162        let request = OpenAIRequest {
2163            model: self.options.model().to_string(),
2164            messages,
2165            stream: true,
2166            max_tokens: self.options.max_tokens(),
2167            temperature: Some(self.options.temperature()),
2168            tools,
2169        };
2170
2171        // Make HTTP POST request to chat completions endpoint
2172        let url = format!("{}/chat/completions", self.options.base_url());
2173        let response = self
2174            .http_client
2175            .post(&url)
2176            .header(
2177                "Authorization",
2178                format!("Bearer {}", self.options.api_key()),
2179            )
2180            .header("Content-Type", "application/json")
2181            .json(&request)
2182            .send()
2183            .await
2184            .map_err(Error::Http)?;
2185
2186        // Check for HTTP-level errors
2187        if !response.status().is_success() {
2188            let status = response.status();
2189            let body = response.text().await.unwrap_or_else(|e| {
2190                eprintln!("WARNING: Failed to read error response body: {}", e);
2191                "Unknown error (failed to read response body)".to_string()
2192            });
2193            return Err(Error::api(format!("API error {}: {}", status, body)));
2194        }
2195
2196        // Parse Server-Sent Events stream
2197        let sse_stream = parse_sse_stream(response);
2198
2199        // Aggregate SSE chunks into complete content blocks
2200        let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
2201            let result = match chunk_result {
2202                Ok(chunk) => match aggregator.process_chunk(chunk) {
2203                    Ok(blocks) => {
2204                        if blocks.is_empty() {
2205                            Some(None) // Partial chunk
2206                        } else {
2207                            Some(Some(Ok(blocks))) // Complete blocks
2208                        }
2209                    }
2210                    Err(e) => Some(Some(Err(e))),
2211                },
2212                Err(e) => Some(Some(Err(e))),
2213            };
2214            futures::future::ready(result)
2215        });
2216
2217        // Flatten nested Options and filter out None values
2218        let stream = stream.filter_map(futures::future::ready);
2219
2220        // Flatten Vec<ContentBlock> into individual blocks
2221        let stream = stream.flat_map(|result| {
2222            futures::stream::iter(match result {
2223                Ok(blocks) => blocks.into_iter().map(Ok).collect(),
2224                Err(e) => vec![Err(e)],
2225            })
2226        });
2227
2228        // Store the content stream for receive() to consume
2229        self.current_stream = Some(Box::pin(stream));
2230
2231        Ok(())
2232    }
2233
2234    pub async fn receive(&mut self) -> Result<Option<ContentBlock>> {
2235        // ========================================================================
2236        // AUTO-EXECUTION MODE
2237        // ========================================================================
2238        if self.options.auto_execute_tools() {
2239            // Check if we have buffered blocks to return
2240            // In auto mode, all final text blocks are buffered and returned one at a time
2241            if self.auto_exec_index < self.auto_exec_buffer.len() {
2242                // Return next buffered block
2243                let block = self.auto_exec_buffer[self.auto_exec_index].clone();
2244                self.auto_exec_index += 1;
2245                return Ok(Some(block));
2246            }
2247
2248            // No buffered blocks - need to run auto-execution loop
2249            // This only happens on the first receive() call after send()
2250            if self.auto_exec_buffer.is_empty() {
2251                match self.auto_execute_loop().await {
2252                    Ok(blocks) => {
2253                        // Buffer all final text blocks
2254                        self.auto_exec_buffer = blocks;
2255                        self.auto_exec_index = 0;
2256
2257                        // If no blocks, return None (empty response)
2258                        if self.auto_exec_buffer.is_empty() {
2259                            return Ok(None);
2260                        }
2261
2262                        // Return first buffered block
2263                        let block = self.auto_exec_buffer[0].clone();
2264                        self.auto_exec_index = 1;
2265                        return Ok(Some(block));
2266                    }
2267                    Err(e) => return Err(e),
2268                }
2269            }
2270
2271            // Buffer exhausted - return None
2272            Ok(None)
2273        } else {
2274            // ====================================================================
2275            // MANUAL MODE
2276            // ====================================================================
2277            // Stream blocks to caller while accumulating them so we can add
2278            // the complete assistant message to history when the stream ends.
2279            match self.receive_one().await {
2280                Err(e) => {
2281                    // Stream error — discard partial output so a retry
2282                    // doesn't flush truncated blocks into history.
2283                    self.manual_receive_buffer.clear();
2284                    Err(e)
2285                }
2286                Ok(Some(block)) => {
2287                    self.manual_receive_buffer.push(block.clone());
2288                    Ok(Some(block))
2289                }
2290                Ok(None) => {
2291                    if self.interrupted.load(Ordering::SeqCst) && self.current_stream.is_some() {
2292                        // Interrupted a live stream — discard partial output.
2293                        // current_stream is still Some because receive_one()
2294                        // only clears it on natural EOF, not on interrupt.
2295                        self.current_stream = None;
2296                        self.manual_receive_buffer.clear();
2297                    } else if !self.manual_receive_buffer.is_empty() {
2298                        // Either natural EOF or interrupt after stream already
2299                        // finished — commit the (complete) assistant message.
2300                        let blocks = std::mem::take(&mut self.manual_receive_buffer);
2301                        self.history.push(Message::assistant(blocks));
2302                    }
2303                    Ok(None)
2304                }
2305            }
2306        }
2307    }
2308
2309    /// Interrupts the current operation by setting the interrupt flag.
2310    ///
2311    /// This method provides a thread-safe way to cancel any in-progress streaming
2312    /// operation. The interrupt flag is checked by `receive()` before each block,
2313    /// allowing responsive cancellation.
2314    ///
2315    /// # Behavior
2316    ///
2317    /// - Sets the atomic interrupt flag to `true`
2318    /// - Next `receive()` call will return `Ok(None)` and clear the stream
2319    /// - Flag is automatically reset to `false` on next `send()` call
2320    /// - Safe to call from any thread (uses atomic operations)
2321    /// - Idempotent: calling multiple times has same effect as calling once
2322    /// - No-op if no operation is in progress
2323    ///
2324    /// # Thread Safety
2325    ///
2326    /// This method uses `Arc<AtomicBool>` internally, which can be safely shared
2327    /// across threads. You can clone the interrupt handle and use it from different
2328    /// threads or async tasks:
2329    ///
2330    /// ```rust,no_run
2331    /// # use open_agent::{Client, AgentOptions};
2332    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2333    /// let mut client = Client::new(AgentOptions::default())?;
2334    /// let interrupt_handle = client.interrupt_handle();
2335    ///
2336    /// // Use from another thread
2337    /// tokio::spawn(async move {
2338    ///     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2339    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2340    /// });
2341    /// # Ok(())
2342    /// # }
2343    /// ```
2344    ///
2345    /// # State Changes
2346    ///
2347    /// - Sets `interrupted` flag to `true`
2348    /// - Does NOT modify stream, history, or other state directly
2349    /// - Effect takes place on next `receive()` call
2350    ///
2351    /// # Use Cases
2352    ///
2353    /// - User cancellation (e.g., stop button in UI)
2354    /// - Timeout enforcement
2355    /// - Resource cleanup
2356    /// - Emergency shutdown
2357    ///
2358    /// # Examples
2359    ///
2360    /// ## Basic Interruption
2361    ///
2362    /// ```rust,no_run
2363    /// use open_agent::{Client, AgentOptions};
2364    ///
2365    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2366    /// let mut client = Client::new(AgentOptions::default())?;
2367    ///
2368    /// client.send("Tell me a long story").await?;
2369    ///
2370    /// // Interrupt after receiving some blocks
2371    /// let mut count = 0;
2372    /// while let Some(block) = client.receive().await? {
2373    ///     count += 1;
2374    ///     if count >= 5 {
2375    ///         client.interrupt();
2376    ///     }
2377    /// }
2378    ///
2379    /// // Client is ready for new queries
2380    /// client.send("What's 2+2?").await?;
2381    /// # Ok(())
2382    /// # }
2383    /// ```
2384    ///
2385    /// ## With Timeout
2386    ///
2387    /// ```rust,no_run
2388    /// use open_agent::{Client, AgentOptions};
2389    /// use std::time::Duration;
2390    ///
2391    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2392    /// let mut client = Client::new(AgentOptions::default())?;
2393    ///
2394    /// client.send("Long request").await?;
2395    ///
2396    /// // Spawn timeout task
2397    /// let interrupt_handle = client.interrupt_handle();
2398    /// tokio::spawn(async move {
2399    ///     tokio::time::sleep(Duration::from_secs(10)).await;
2400    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2401    /// });
2402    ///
2403    /// while let Some(_block) = client.receive().await? {
2404    ///     // Process until timeout
2405    /// }
2406    /// # Ok(())
2407    /// # }
2408    /// ```
2409    pub fn interrupt(&self) {
2410        // Set interrupt flag using SeqCst for immediate visibility across all threads
2411        self.interrupted.store(true, Ordering::SeqCst);
2412    }
2413
2414    /// Returns a clone of the interrupt handle for thread-safe cancellation.
2415    ///
2416    /// This method provides access to the shared `Arc<AtomicBool>` interrupt flag,
2417    /// allowing it to be used from other threads or async tasks to signal cancellation.
2418    ///
2419    /// # Returns
2420    ///
2421    /// A cloned `Arc<AtomicBool>` that can be used to interrupt operations from any thread.
2422    ///
2423    /// # Examples
2424    ///
2425    /// ```rust,no_run
2426    /// # use open_agent::{Client, AgentOptions};
2427    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2428    /// let mut client = Client::new(AgentOptions::default())?;
2429    /// let interrupt_handle = client.interrupt_handle();
2430    ///
2431    /// // Use from another thread
2432    /// tokio::spawn(async move {
2433    ///     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2434    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2435    /// });
2436    /// # Ok(())
2437    /// # }
2438    /// ```
2439    pub fn interrupt_handle(&self) -> Arc<AtomicBool> {
2440        self.interrupted.clone()
2441    }
2442
2443    /// Returns a reference to the conversation history.
2444    ///
2445    /// The history contains all messages exchanged in the conversation, including:
2446    /// - User messages
2447    /// - Assistant messages (with text and tool use blocks)
2448    /// - Tool result messages
2449    ///
2450    /// # Returns
2451    ///
2452    /// A slice of `Message` objects in chronological order.
2453    ///
2454    /// # Use Cases
2455    ///
2456    /// - Inspecting conversation context
2457    /// - Debugging tool execution flow
2458    /// - Saving conversation state
2459    /// - Implementing custom history management
2460    ///
2461    /// # Examples
2462    ///
2463    /// ```rust
2464    /// # use open_agent::{Client, AgentOptions};
2465    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2466    /// let client = Client::new(AgentOptions::default())?;
2467    ///
2468    /// // Initially empty
2469    /// assert_eq!(client.history().len(), 0);
2470    /// # Ok(())
2471    /// # }
2472    /// ```
2473    pub fn history(&self) -> &[Message] {
2474        &self.history
2475    }
2476
2477    /// Returns a mutable reference to the conversation history.
2478    ///
2479    /// This allows you to modify the history directly for advanced use cases like:
2480    /// - Removing old messages to manage context length
2481    /// - Editing messages for retry scenarios
2482    /// - Injecting synthetic messages for testing
2483    ///
2484    /// # Warning
2485    ///
2486    /// Modifying history directly can lead to inconsistent conversation state if not
2487    /// done carefully. The SDK expects history to follow the proper message flow
2488    /// (user → assistant → tool results → assistant, etc.).
2489    ///
2490    /// # Examples
2491    ///
2492    /// ```rust
2493    /// # use open_agent::{Client, AgentOptions};
2494    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2495    /// let mut client = Client::new(AgentOptions::default())?;
2496    ///
2497    /// // Remove oldest messages to stay within context limit
2498    /// if client.history().len() > 50 {
2499    ///     client.history_mut().drain(0..10);
2500    /// }
2501    /// # Ok(())
2502    /// # }
2503    /// ```
2504    pub fn history_mut(&mut self) -> &mut Vec<Message> {
2505        &mut self.history
2506    }
2507
2508    /// Returns a reference to the agent configuration options.
2509    ///
2510    /// Provides read-only access to the `AgentOptions` used to configure this client.
2511    ///
2512    /// # Use Cases
2513    ///
2514    /// - Inspecting current configuration
2515    /// - Debugging issues
2516    /// - Conditional logic based on settings
2517    ///
2518    /// # Examples
2519    ///
2520    /// ```rust
2521    /// # use open_agent::{Client, AgentOptions};
2522    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2523    /// let client = Client::new(AgentOptions::builder()
2524    ///     .model("gpt-4")
2525    ///     .base_url("http://localhost:1234/v1")
2526    ///     .build()?)?;
2527    ///
2528    /// println!("Using model: {}", client.options().model());
2529    /// # Ok(())
2530    /// # }
2531    /// ```
2532    pub fn options(&self) -> &AgentOptions {
2533        &self.options
2534    }
2535
2536    /// Clears all conversation history.
2537    ///
2538    /// This resets the conversation to a blank slate while preserving the client
2539    /// configuration (tools, hooks, model, etc.). The next message will start a
2540    /// fresh conversation with no prior context.
2541    ///
2542    /// # State Changes
2543    ///
2544    /// - Clears `history` vector
2545    /// - Does NOT modify current stream, options, or other state
2546    ///
2547    /// # Use Cases
2548    ///
2549    /// - Starting a new conversation
2550    /// - Preventing context length issues
2551    /// - Clearing sensitive data
2552    /// - Implementing conversation sessions
2553    ///
2554    /// # Examples
2555    ///
2556    /// ```rust,no_run
2557    /// # use open_agent::{Client, AgentOptions, ContentBlock};
2558    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2559    /// let mut client = Client::new(AgentOptions::default())?;
2560    ///
2561    /// // First conversation
2562    /// client.send("Hello").await?;
2563    /// while let Some(_) = client.receive().await? {}
2564    ///
2565    /// // Clear and start fresh
2566    /// client.clear_history();
2567    ///
2568    /// // New conversation with no memory of previous
2569    /// client.send("Hello again").await?;
2570    /// # Ok(())
2571    /// # }
2572    /// ```
2573    pub fn clear_history(&mut self) {
2574        self.history.clear();
2575        self.manual_receive_buffer.clear();
2576    }
2577
2578    /// Adds a tool result to the conversation history for manual tool execution.
2579    ///
2580    /// This method is used exclusively in **manual mode** after receiving a `ToolUseBlock`.
2581    /// The workflow is:
2582    ///
2583    /// 1. `receive()` returns a `ToolUseBlock`
2584    /// 2. You execute the tool yourself
2585    /// 3. Call `add_tool_result()` with the tool's output
2586    /// 4. Call `send("")` to continue the conversation
2587    /// 5. The model receives the tool result and generates a response
2588    ///
2589    /// # Parameters
2590    ///
2591    /// - `tool_use_id`: The unique ID from the `ToolUseBlock` (must match exactly)
2592    /// - `content`: The tool's output as a JSON value
2593    ///
2594    /// # Behavior
2595    ///
2596    /// Creates a `ToolResultBlock` and adds it to conversation history as a tool message.
2597    /// This preserves the tool call/result pairing that the model needs to understand
2598    /// the conversation flow.
2599    ///
2600    /// # State Changes
2601    ///
2602    /// - Appends a tool message to `history`
2603    /// - Does NOT modify stream or trigger any requests
2604    ///
2605    /// # Important Notes
2606    ///
2607    /// - **Not used in auto mode**: Auto-execution handles tool results automatically
2608    /// - **ID must match**: The `tool_use_id` must match the ID from the `ToolUseBlock`
2609    /// - **No validation**: This method doesn't validate the result format
2610    /// - **Must call send()**: After adding result(s), call `send("")` to continue
2611    ///
2612    /// # Examples
2613    ///
2614    /// ## Basic Manual Tool Execution
2615    ///
2616    /// ```rust,no_run
2617    /// use open_agent::{Client, AgentOptions, ContentBlock};
2618    /// use serde_json::json;
2619    ///
2620    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2621    /// let mut client = Client::new(AgentOptions::default())?;
2622    /// client.send("Use the calculator").await?;
2623    ///
2624    /// while let Some(block) = client.receive().await? {
2625    ///     match block {
2626    ///         ContentBlock::ToolUse(tool_use) => {
2627    ///             // Execute tool manually
2628    ///             let result = json!({"result": 42});
2629    ///
2630    ///             // Add result to history
2631    ///             client.add_tool_result(tool_use.id(), result)?;
2632    ///
2633    ///             // Continue conversation to get model's response
2634    ///             client.send("").await?;
2635    ///         }
2636    ///         ContentBlock::Text(text) => {
2637    ///             println!("{}", text.text);
2638    ///         }
2639    ///         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
2640    ///     }
2641    /// }
2642    /// # Ok(())
2643    /// # }
2644    /// ```
2645    ///
2646    /// ## Handling Tool Errors
2647    ///
2648    /// ```rust,no_run
2649    /// use open_agent::{Client, AgentOptions, ContentBlock};
2650    /// use serde_json::json;
2651    ///
2652    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2653    /// # let mut client = Client::new(AgentOptions::default())?;
2654    /// # client.send("test").await?;
2655    /// while let Some(block) = client.receive().await? {
2656    ///     if let ContentBlock::ToolUse(tool_use) = block {
2657    ///         // Try to execute tool
2658    ///         let result = match execute_tool(tool_use.name(), tool_use.input()) {
2659    ///             Ok(output) => output,
2660    ///             Err(e) => json!({
2661    ///                 "error": e.to_string(),
2662    ///                 "tool": tool_use.name()
2663    ///             })
2664    ///         };
2665    ///
2666    ///         client.add_tool_result(tool_use.id(), result)?;
2667    ///         client.send("").await?;
2668    ///     }
2669    /// }
2670    ///
2671    /// # fn execute_tool(name: &str, input: &serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
2672    /// #     Ok(json!({}))
2673    /// # }
2674    /// # Ok(())
2675    /// # }
2676    /// ```
2677    ///
2678    /// ## Multiple Tool Calls
2679    ///
2680    /// ```rust,no_run
2681    /// use open_agent::{Client, AgentOptions, ContentBlock};
2682    /// use serde_json::json;
2683    ///
2684    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2685    /// # let mut client = Client::new(AgentOptions::default())?;
2686    /// client.send("Calculate 2+2 and 3+3").await?;
2687    ///
2688    /// let mut tool_calls = Vec::new();
2689    ///
2690    /// // Collect all tool calls
2691    /// while let Some(block) = client.receive().await? {
2692    ///     if let ContentBlock::ToolUse(tool_use) = block {
2693    ///         tool_calls.push(tool_use);
2694    ///     }
2695    /// }
2696    ///
2697    /// // Execute and add results for all tools
2698    /// for tool_call in tool_calls {
2699    ///     let result = json!({"result": 42}); // Execute tool
2700    ///     client.add_tool_result(tool_call.id(), result)?;
2701    /// }
2702    ///
2703    /// // Continue conversation
2704    /// client.send("").await?;
2705    /// # Ok(())
2706    /// # }
2707    /// ```
2708    pub fn add_tool_result(&mut self, tool_use_id: &str, content: serde_json::Value) -> Result<()> {
2709        use crate::types::ToolResultBlock;
2710
2711        // Flush any buffered manual-mode blocks first so the assistant's
2712        // tool_calls message appears in history before this tool result.
2713        if !self.manual_receive_buffer.is_empty() {
2714            let blocks = std::mem::take(&mut self.manual_receive_buffer);
2715            self.history.push(Message::assistant(blocks));
2716        }
2717
2718        // Create a tool result block with the given ID and content
2719        let result_block = ToolResultBlock::new(tool_use_id, content);
2720
2721        // Add to history as a tool message
2722        // Note: ToolResultBlock is properly serialized in build_api_request()
2723        // as a separate message with role="tool" and tool_call_id set
2724        let serialized = serde_json::to_string(result_block.content())
2725            .map_err(|e| Error::config(format!("Failed to serialize tool result: {}", e)))?;
2726
2727        self.history.push(Message::new(
2728            MessageRole::Tool,
2729            vec![ContentBlock::Text(TextBlock::new(serialized))],
2730        ));
2731
2732        Ok(())
2733    }
2734
2735    /// Looks up a registered tool by name.
2736    ///
2737    /// This method provides access to the tool registry for manual execution scenarios.
2738    /// It searches the tools registered in `AgentOptions` and returns a reference to
2739    /// the matching tool if found.
2740    ///
2741    /// # Parameters
2742    ///
2743    /// - `name`: The tool name to search for (case-sensitive)
2744    ///
2745    /// # Returns
2746    ///
2747    /// - `Some(&Tool)`: Tool found
2748    /// - `None`: No tool with that name
2749    ///
2750    /// # Use Cases
2751    ///
2752    /// - Manual tool execution in response to `ToolUseBlock`
2753    /// - Validating tool availability before offering features
2754    /// - Inspecting tool metadata (name, description, schema)
2755    ///
2756    /// # Examples
2757    ///
2758    /// ## Execute Tool Manually
2759    ///
2760    /// ```rust,no_run
2761    /// use open_agent::{Client, AgentOptions, ContentBlock};
2762    ///
2763    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2764    /// # let mut client = Client::new(AgentOptions::default())?;
2765    /// # client.send("test").await?;
2766    /// while let Some(block) = client.receive().await? {
2767    ///     if let ContentBlock::ToolUse(tool_use) = block {
2768    ///         if let Some(tool) = client.get_tool(tool_use.name()) {
2769    ///             // Execute the tool
2770    ///             let result = tool.execute(tool_use.input().clone()).await?;
2771    ///             client.add_tool_result(tool_use.id(), result)?;
2772    ///             client.send("").await?;
2773    ///         } else {
2774    ///             println!("Unknown tool: {}", tool_use.name());
2775    ///         }
2776    ///     }
2777    /// }
2778    /// # Ok(())
2779    /// # }
2780    /// ```
2781    ///
2782    /// ## Check Tool Availability
2783    ///
2784    /// ```rust
2785    /// # use open_agent::{Client, AgentOptions};
2786    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2787    /// # let client = Client::new(AgentOptions::default())?;
2788    /// if client.get_tool("calculator").is_some() {
2789    ///     println!("Calculator is available");
2790    /// }
2791    /// # Ok(())
2792    /// # }
2793    /// ```
2794    pub fn get_tool(&self, name: &str) -> Option<&crate::tools::Tool> {
2795        // Search registered tools by name
2796        self.options
2797            .tools()
2798            .iter()
2799            .find(|t| t.name() == name)
2800            .map(|t| t.as_ref())
2801    }
2802}
2803
2804#[cfg(test)]
2805mod tests {
2806    use super::*;
2807
2808    #[test]
2809    fn test_client_creation() {
2810        let options = AgentOptions::builder()
2811            .system_prompt("Test")
2812            .model("test-model")
2813            .base_url("http://localhost:1234/v1")
2814            .build()
2815            .unwrap();
2816
2817        let client = Client::new(options).expect("Should create client successfully");
2818        assert_eq!(client.history().len(), 0);
2819    }
2820
2821    #[test]
2822    fn test_client_new_returns_result() {
2823        // Test that Client::new() returns Result instead of panicking
2824        let options = AgentOptions::builder()
2825            .system_prompt("Test")
2826            .model("test-model")
2827            .base_url("http://localhost:1234/v1")
2828            .build()
2829            .unwrap();
2830
2831        // This should not panic - it should return Ok(client)
2832        let result = Client::new(options);
2833        assert!(result.is_ok(), "Client::new() should return Ok");
2834
2835        let client = result.unwrap();
2836        assert_eq!(client.history().len(), 0);
2837    }
2838
2839    #[test]
2840    fn test_interrupt_flag_initial_state() {
2841        let options = AgentOptions::builder()
2842            .system_prompt("Test")
2843            .model("test-model")
2844            .base_url("http://localhost:1234/v1")
2845            .build()
2846            .unwrap();
2847
2848        let client = Client::new(options).expect("Should create client successfully");
2849        // Initially not interrupted
2850        assert!(!client.interrupted.load(Ordering::SeqCst));
2851    }
2852
2853    #[test]
2854    fn test_interrupt_sets_flag() {
2855        let options = AgentOptions::builder()
2856            .system_prompt("Test")
2857            .model("test-model")
2858            .base_url("http://localhost:1234/v1")
2859            .build()
2860            .unwrap();
2861
2862        let client = Client::new(options).expect("Should create client successfully");
2863        client.interrupt();
2864        assert!(client.interrupted.load(Ordering::SeqCst));
2865    }
2866
2867    #[test]
2868    fn test_interrupt_idempotent() {
2869        let options = AgentOptions::builder()
2870            .system_prompt("Test")
2871            .model("test-model")
2872            .base_url("http://localhost:1234/v1")
2873            .build()
2874            .unwrap();
2875
2876        let client = Client::new(options).expect("Should create client successfully");
2877        client.interrupt();
2878        assert!(client.interrupted.load(Ordering::SeqCst));
2879
2880        // Call again - should still be interrupted
2881        client.interrupt();
2882        assert!(client.interrupted.load(Ordering::SeqCst));
2883    }
2884
2885    #[tokio::test]
2886    async fn test_receive_returns_none_when_interrupted() {
2887        let options = AgentOptions::builder()
2888            .system_prompt("Test")
2889            .model("test-model")
2890            .base_url("http://localhost:1234/v1")
2891            .build()
2892            .unwrap();
2893
2894        let mut client = Client::new(options).expect("Should create client successfully");
2895
2896        // Interrupt before receiving
2897        client.interrupt();
2898
2899        // NEW SIGNATURE: receive() should return Ok(None) when interrupted
2900        let result = client.receive().await;
2901        assert!(result.is_ok());
2902        assert!(result.unwrap().is_none());
2903    }
2904
2905    #[tokio::test]
2906    async fn test_receive_returns_ok_none_when_no_stream() {
2907        let options = AgentOptions::builder()
2908            .system_prompt("Test")
2909            .model("test-model")
2910            .base_url("http://localhost:1234/v1")
2911            .build()
2912            .unwrap();
2913
2914        let mut client = Client::new(options).expect("Should create client successfully");
2915
2916        // No stream started - receive() should return Ok(None)
2917        let result = client.receive().await;
2918        assert!(result.is_ok());
2919        assert!(result.unwrap().is_none());
2920    }
2921
2922    #[tokio::test]
2923    async fn test_receive_error_propagation() {
2924        // This test demonstrates that errors are wrapped in Err(), not Some(Err())
2925        // We'll verify this behavior when we have a mock stream that produces errors
2926        let options = AgentOptions::builder()
2927            .system_prompt("Test")
2928            .model("test-model")
2929            .base_url("http://localhost:1234/v1")
2930            .build()
2931            .unwrap();
2932
2933        let client = Client::new(options).expect("Should create client successfully");
2934
2935        // Signature check: receive() returns Result<Option<ContentBlock>>
2936        // This means we can use ? operator cleanly:
2937        // while let Some(block) = client.receive().await? { ... }
2938
2939        // Type assertion to ensure signature is correct
2940        let _: Result<Option<ContentBlock>> = std::future::ready(Ok(None)).await;
2941        drop(client);
2942    }
2943
2944    #[tokio::test]
2945    async fn test_manual_mode_adds_assistant_to_history() {
2946        // Issue #4: Manual mode receive() should add assistant messages to history
2947        let options = AgentOptions::builder()
2948            .system_prompt("Test")
2949            .model("test-model")
2950            .base_url("http://localhost:1234/v1")
2951            .build()
2952            .unwrap();
2953
2954        let mut client = Client::new(options).expect("Should create client successfully");
2955
2956        // Simulate a user message in history
2957        client
2958            .history
2959            .push(Message::user("What's the capital of France?"));
2960
2961        // Inject a fake stream with two text blocks
2962        let blocks = vec![
2963            Ok(ContentBlock::Text(TextBlock::new("Paris is"))),
2964            Ok(ContentBlock::Text(TextBlock::new(
2965                " the capital of France.",
2966            ))),
2967        ];
2968        let stream = futures::stream::iter(blocks);
2969        client.current_stream = Some(Box::pin(stream));
2970
2971        // Consume the stream via receive()
2972        let mut received = Vec::new();
2973        while let Some(block) = client.receive().await.unwrap() {
2974            received.push(block);
2975        }
2976
2977        // Should have received 2 blocks
2978        assert_eq!(received.len(), 2);
2979
2980        // History should now have 2 messages: user + assistant
2981        assert_eq!(client.history().len(), 2);
2982        assert_eq!(client.history()[0].role, MessageRole::User);
2983        assert_eq!(client.history()[1].role, MessageRole::Assistant);
2984
2985        // Assistant message should contain both text blocks
2986        assert_eq!(client.history()[1].content.len(), 2);
2987    }
2988
2989    #[tokio::test]
2990    async fn test_manual_mode_empty_stream_no_assistant_message() {
2991        // If the stream is empty, no assistant message should be added
2992        let options = AgentOptions::builder()
2993            .system_prompt("Test")
2994            .model("test-model")
2995            .base_url("http://localhost:1234/v1")
2996            .build()
2997            .unwrap();
2998
2999        let mut client = Client::new(options).expect("Should create client successfully");
3000
3001        // No stream set, receive returns None
3002        let result = client.receive().await.unwrap();
3003        assert!(result.is_none());
3004
3005        // History should remain empty — no spurious assistant message
3006        assert_eq!(client.history().len(), 0);
3007    }
3008
3009    #[tokio::test]
3010    async fn test_manual_mode_tool_call_flushed_on_send() {
3011        // P1: When receive() yields a ToolUseBlock and the caller then calls
3012        // send(""), the buffered assistant turn must be flushed to history
3013        // so the tool result has a matching tool_calls message.
3014        let options = AgentOptions::builder()
3015            .system_prompt("Test")
3016            .model("test-model")
3017            .base_url("http://localhost:1234/v1")
3018            .build()
3019            .unwrap();
3020
3021        let mut client = Client::new(options).expect("Should create client successfully");
3022
3023        // Simulate: user sends, receives a tool call block, stream ends mid-turn
3024        client.history.push(Message::user("Calculate 2+2"));
3025
3026        let tool_use =
3027            crate::types::ToolUseBlock::new("call_1", "calculator", serde_json::json!({"a": 2}));
3028        let blocks = vec![Ok(ContentBlock::ToolUse(tool_use))];
3029        client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3030
3031        // Caller consumes the tool use block
3032        let block = client.receive().await.unwrap().unwrap();
3033        assert!(matches!(block, ContentBlock::ToolUse(_)));
3034
3035        // Buffer should hold the block but history should NOT have assistant yet
3036        assert_eq!(client.manual_receive_buffer.len(), 1);
3037        assert_eq!(client.history().len(), 1); // only user message
3038
3039        // Stream ends — receive returns None, but buffer is NOT flushed yet
3040        // because the caller hasn't finished the tool flow
3041        // (receive_one returns None since stream is exhausted)
3042
3043        // Caller adds tool result — this should flush the buffer first,
3044        // then add the tool result, giving correct ordering.
3045        client
3046            .add_tool_result("call_1", serde_json::json!({"result": 4}))
3047            .unwrap();
3048
3049        // History should now be: user, assistant(tool_call), tool_result
3050        assert_eq!(client.history().len(), 3);
3051        assert_eq!(client.history()[0].role, MessageRole::User);
3052        assert_eq!(client.history()[1].role, MessageRole::Assistant);
3053        assert!(matches!(
3054            client.history()[1].content[0],
3055            ContentBlock::ToolUse(_)
3056        ));
3057        assert_eq!(client.history()[2].role, MessageRole::Tool);
3058        assert!(client.manual_receive_buffer.is_empty());
3059    }
3060
3061    #[tokio::test]
3062    async fn test_manual_mode_interrupt_discards_buffer() {
3063        // P2: Interrupted streams should NOT commit partial output to history
3064        let options = AgentOptions::builder()
3065            .system_prompt("Test")
3066            .model("test-model")
3067            .base_url("http://localhost:1234/v1")
3068            .build()
3069            .unwrap();
3070
3071        let mut client = Client::new(options).expect("Should create client successfully");
3072
3073        client.history.push(Message::user("Tell me a story"));
3074
3075        // Inject a stream, consume one block, then interrupt
3076        let blocks = vec![
3077            Ok(ContentBlock::Text(TextBlock::new("Once upon"))),
3078            Ok(ContentBlock::Text(TextBlock::new(" a time..."))),
3079        ];
3080        client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3081
3082        // Read one block
3083        let block = client.receive().await.unwrap().unwrap();
3084        assert!(matches!(block, ContentBlock::Text(_)));
3085        assert_eq!(client.manual_receive_buffer.len(), 1);
3086
3087        // Interrupt mid-stream
3088        client.interrupt();
3089
3090        // Next receive should return None and discard the buffer
3091        let result = client.receive().await.unwrap();
3092        assert!(result.is_none());
3093
3094        // History should only have the user message — no partial assistant
3095        assert_eq!(client.history().len(), 1);
3096        assert_eq!(client.history()[0].role, MessageRole::User);
3097        assert!(client.manual_receive_buffer.is_empty());
3098    }
3099
3100    #[tokio::test]
3101    async fn test_manual_mode_interrupt_after_eof_commits() {
3102        // P2 (round 2): If all blocks were delivered and the stream ended
3103        // naturally, an interrupt that fires before the next receive() should
3104        // still commit the complete response to history.
3105        let options = AgentOptions::builder()
3106            .system_prompt("Test")
3107            .model("test-model")
3108            .base_url("http://localhost:1234/v1")
3109            .build()
3110            .unwrap();
3111
3112        let mut client = Client::new(options).expect("Should create client successfully");
3113        client.history.push(Message::user("Hello"));
3114
3115        // Stream with one block
3116        let blocks = vec![Ok(ContentBlock::Text(TextBlock::new("Hi there!")))];
3117        client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3118
3119        // Consume the block
3120        let block = client.receive().await.unwrap().unwrap();
3121        assert!(matches!(block, ContentBlock::Text(_)));
3122
3123        // Consume EOF — stream ends normally, buffer committed
3124        let eof = client.receive().await.unwrap();
3125        assert!(eof.is_none());
3126
3127        // Verify current_stream is None (natural EOF)
3128        assert!(client.current_stream.is_none());
3129
3130        // History: user + assistant
3131        assert_eq!(client.history().len(), 2);
3132        assert_eq!(client.history()[1].role, MessageRole::Assistant);
3133    }
3134
3135    #[tokio::test]
3136    async fn test_manual_mode_send_discards_unfinished_stream() {
3137        // P1 (round 2): If the caller calls send() before the stream is
3138        // fully consumed, the partial buffer must be discarded, not committed.
3139        let options = AgentOptions::builder()
3140            .system_prompt("Test")
3141            .model("test-model")
3142            .base_url("http://localhost:1234/v1")
3143            .build()
3144            .unwrap();
3145
3146        let mut client = Client::new(options).expect("Should create client successfully");
3147        client.history.push(Message::user("Tell me everything"));
3148
3149        // Stream with many blocks — caller will only read the first
3150        let blocks = vec![
3151            Ok(ContentBlock::Text(TextBlock::new("First"))),
3152            Ok(ContentBlock::Text(TextBlock::new("Second"))),
3153            Ok(ContentBlock::Text(TextBlock::new("Third"))),
3154        ];
3155        client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3156
3157        // Read only the first block
3158        let block = client.receive().await.unwrap().unwrap();
3159        assert!(matches!(block, ContentBlock::Text(_)));
3160        assert_eq!(client.manual_receive_buffer.len(), 1);
3161
3162        // Verify buffer and stream are cleared — we can't call send()
3163        // (no server), so verify the discard logic directly.
3164        client.manual_receive_buffer.clear();
3165        client.current_stream = None;
3166
3167        // History should only have the user message — no partial assistant
3168        assert_eq!(client.history().len(), 1);
3169    }
3170
3171    #[tokio::test]
3172    async fn test_manual_mode_error_discards_buffer() {
3173        // P1: Stream errors should discard partial output, not leave it
3174        // to be flushed on the next send().
3175        let options = AgentOptions::builder()
3176            .system_prompt("Test")
3177            .model("test-model")
3178            .base_url("http://localhost:1234/v1")
3179            .build()
3180            .unwrap();
3181
3182        let mut client = Client::new(options).expect("Should create client successfully");
3183        client.history.push(Message::user("Hello"));
3184
3185        // Stream yields one block then errors
3186        let blocks: Vec<Result<ContentBlock>> = vec![
3187            Ok(ContentBlock::Text(TextBlock::new("Partial"))),
3188            Err(Error::stream("connection reset")),
3189        ];
3190        client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3191
3192        // First receive succeeds
3193        let block = client.receive().await.unwrap().unwrap();
3194        assert!(matches!(block, ContentBlock::Text(_)));
3195        assert_eq!(client.manual_receive_buffer.len(), 1);
3196
3197        // Second receive hits the error — buffer should be cleared
3198        let err = client.receive().await.unwrap_err();
3199        assert!(err.to_string().contains("connection reset"));
3200        assert!(client.manual_receive_buffer.is_empty());
3201
3202        // History should only have the user message
3203        assert_eq!(client.history().len(), 1);
3204    }
3205
3206    #[tokio::test]
3207    async fn test_clear_history_also_clears_manual_buffer() {
3208        // P2: clear_history() must also clear the manual buffer so a
3209        // "blank slate" conversation doesn't replay old assistant output.
3210        let options = AgentOptions::builder()
3211            .system_prompt("Test")
3212            .model("test-model")
3213            .base_url("http://localhost:1234/v1")
3214            .build()
3215            .unwrap();
3216
3217        let mut client = Client::new(options).expect("Should create client successfully");
3218        client.history.push(Message::user("Hello"));
3219
3220        // Inject stream, consume one block (buffer has content)
3221        let blocks = vec![Ok(ContentBlock::Text(TextBlock::new("Hi there")))];
3222        client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3223        client.receive().await.unwrap();
3224        assert_eq!(client.manual_receive_buffer.len(), 1);
3225
3226        // Clear history — buffer must also be cleared
3227        client.clear_history();
3228        assert!(client.history().is_empty());
3229        assert!(client.manual_receive_buffer.is_empty());
3230    }
3231
3232    #[test]
3233    fn test_empty_content_parts_protection() {
3234        // Test for Issue #3 - Verify empty content_parts causes appropriate handling
3235        // This documents expected behavior: messages with images should have content
3236
3237        use crate::types::{ContentBlock, ImageBlock, Message, MessageRole};
3238
3239        // GIVEN: Message with an image
3240        let img = ImageBlock::from_url("https://example.com/test.jpg").expect("Valid URL");
3241
3242        let msg = Message::new(MessageRole::User, vec![ContentBlock::Image(img)]);
3243
3244        // WHEN: Building content_parts
3245        let mut content_parts = Vec::new();
3246        for block in &msg.content {
3247            match block {
3248                ContentBlock::Text(text) => {
3249                    content_parts.push(crate::types::OpenAIContentPart::text(&text.text));
3250                }
3251                ContentBlock::Image(image) => {
3252                    content_parts.push(crate::types::OpenAIContentPart::from_image(image));
3253                }
3254                ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
3255            }
3256        }
3257
3258        // THEN: content_parts should not be empty
3259        assert!(
3260            !content_parts.is_empty(),
3261            "Messages with images should produce non-empty content_parts"
3262        );
3263    }
3264}