open_agent/
client.rs

1//! Client for streaming queries and multi-turn conversations
2//!
3//! This module provides the core streaming client implementation for the Open Agent SDK.
4//! It handles communication with OpenAI-compatible APIs, manages conversation history,
5//! and provides two modes of operation: manual and automatic tool execution.
6//!
7//! # Architecture Overview
8//!
9//! The SDK implements a **streaming-first architecture** where all responses from the model
10//! are received as a stream of content blocks. This design enables:
11//!
12//! - **Progressive rendering**: Display text as it arrives without waiting for completion
13//! - **Real-time tool execution**: Execute tools as they're requested by the model
14//! - **Interruption support**: Cancel operations mid-stream without corrupting state
15//! - **Memory efficiency**: Process large responses without buffering everything in memory
16//!
17//! ## Two Operating Modes
18//!
19//! ### 1. Manual Tool Execution Mode (default)
20//!
21//! In manual mode, the client streams content blocks directly to the caller. When the model
22//! requests a tool, the caller receives a `ToolUseBlock`, executes the tool, adds the result
23//! using `add_tool_result()`, and continues the conversation with another `send()` call.
24//!
25//! **Use cases**: Custom tool execution logic, interactive debugging, fine-grained control
26//!
27//! ### 2. Automatic Tool Execution Mode
28//!
29//! When `auto_execute_tools` is enabled, the client automatically executes tools and continues
30//! the conversation until receiving a text-only response. The caller only receives the final
31//! text blocks after all tool iterations complete.
32//!
33//! **Use cases**: Simple agentic workflows, automated task completion, batch processing
34//!
35//! ## Request Flow
36//!
37//! ```text
38//! User sends prompt
39//!     │
40//!     ├─> UserPromptSubmit hook executes (can modify/block prompt)
41//!     │
42//!     ├─> Prompt added to history
43//!     │
44//!     ├─> HTTP request to OpenAI-compatible API
45//!     │
46//!     ├─> Response streamed as Server-Sent Events (SSE)
47//!     │
48//!     ├─> SSE chunks aggregated into ContentBlocks
49//!     │
50//!     └─> Blocks emitted to caller (or buffered for auto-execution)
51//! ```
52//!
53//! ## Tool Execution Flow
54//!
55//! ### Manual Mode:
56//! ```text
57//! receive() → ToolUseBlock
58//!     │
59//!     ├─> Caller executes tool
60//!     │
61//!     ├─> Caller calls add_tool_result()
62//!     │
63//!     ├─> Caller calls send("") to continue
64//!     │
65//!     └─> receive() → TextBlock (model's response)
66//! ```
67//!
68//! ### Auto Mode:
69//! ```text
70//! receive() triggers auto-execution loop
71//!     │
72//!     ├─> Collect all blocks from stream
73//!     │
74//!     ├─> For each ToolUseBlock:
75//!     │   ├─> PreToolUse hook executes (can modify/block)
76//!     │   ├─> Tool executed via Tool.execute()
77//!     │   ├─> PostToolUse hook executes (can modify result)
78//!     │   └─> Result added to history
79//!     │
80//!     ├─> Continue conversation with send("")
81//!     │
82//!     ├─> Repeat until text-only response or max iterations
83//!     │
84//!     └─> Return text blocks one-by-one via receive()
85//! ```
86//!
87//! ## State Management
88//!
89//! The client maintains several pieces of state:
90//!
91//! - **history**: Full conversation history (`Vec<Message>`)
92//! - **current_stream**: Active SSE stream being consumed (`Option<ContentStream>`)
93//! - **interrupted**: Atomic flag for cancellation (`Arc<AtomicBool>`)
94//! - **auto_exec_buffer**: Buffered blocks for auto-execution mode (`Vec<ContentBlock>`)
95//! - **auto_exec_index**: Current position in buffer (usize)
96//!
97//! ## Interruption Mechanism
98//!
99//! The interrupt system uses `Arc<AtomicBool>` to enable safe, thread-safe cancellation:
100//!
101//! ```rust,no_run
102//! # use open_agent::{Client, AgentOptions};
103//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
104//! let mut client = Client::new(AgentOptions::default())?;
105//! let handle = client.interrupt_handle(); // Clone Arc for use in other threads
106//!
107//! // In another thread or async task:
108//! tokio::spawn(async move {
109//!     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
110//!     handle.store(true, std::sync::atomic::Ordering::SeqCst);
111//! });
112//!
113//! client.send("Long request").await?;
114//! while let Some(block) = client.receive().await? {
115//!     // Will stop when interrupted
116//! }
117//! # Ok(())
118//! # }
119//! ```
120//!
121//! ## Hook Integration
122//!
123//! Hooks provide extension points throughout the request lifecycle:
124//!
125//! - **UserPromptSubmit**: Called before sending user prompt (can modify or block)
126//! - **PreToolUse**: Called before executing each tool (can modify input or block execution)
127//! - **PostToolUse**: Called after tool execution (can modify result)
128//!
129//! Hooks are only invoked in specific scenarios and have access to conversation history.
130//!
131//! ## Error Handling
132//!
133//! Errors are propagated immediately and leave the client in a valid state:
134//!
135//! - **HTTP errors**: Network failures, timeouts, connection issues
136//! - **API errors**: Invalid model, authentication failures, rate limits
137//! - **Parse errors**: Malformed SSE responses, invalid JSON
138//! - **Tool errors**: Tool execution failures (converted to JSON error responses)
139//! - **Hook errors**: Hook execution failures or blocked operations
140//!
141//! After an error, the client remains usable for new requests.
142//!
143//! # Examples
144//!
145//! ## Simple Single-Turn Query
146//!
147//! ```rust,no_run
148//! use open_agent::{query, AgentOptions};
149//! use futures::StreamExt;
150//!
151//! #[tokio::main]
152//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
153//!     let options = AgentOptions::builder()
154//!         .model("gpt-4")
155//!         .api_key("sk-...")
156//!         .build()?;
157//!
158//!     let mut stream = query("What is Rust?", &options).await?;
159//!
160//!     while let Some(block) = stream.next().await {
161//!         if let open_agent::ContentBlock::Text(text) = block? {
162//!             print!("{}", text.text);
163//!         }
164//!     }
165//!
166//!     Ok(())
167//! }
168//! ```
169//!
170//! ## Multi-Turn Conversation
171//!
172//! ```rust,no_run
173//! use open_agent::{Client, AgentOptions, ContentBlock};
174//! use futures::StreamExt;
175//!
176//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
177//! let mut client = Client::new(AgentOptions::builder()
178//!     .model("gpt-4")
179//!     .api_key("sk-...")
180//!     .build()?)?;
181//!
182//! // First question
183//! client.send("What's the capital of France?").await?;
184//! while let Some(block) = client.receive().await? {
185//!     if let ContentBlock::Text(text) = block {
186//!         println!("{}", text.text);
187//!     }
188//! }
189//!
190//! // Follow-up question (history is maintained)
191//! client.send("What's its population?").await?;
192//! while let Some(block) = client.receive().await? {
193//!     if let ContentBlock::Text(text) = block {
194//!         println!("{}", text.text);
195//!     }
196//! }
197//! # Ok(())
198//! # }
199//! ```
200//!
201//! ## Manual Tool Execution
202//!
203//! ```rust,no_run
204//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
205//! use serde_json::json;
206//!
207//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208//! let calculator = Tool::new(
209//!     "calculator",
210//!     "Performs arithmetic operations",
211//!     json!({"type": "object", "properties": {"operation": {"type": "string"}}}),
212//!     |input| Box::pin(async move {
213//!         // Custom execution logic
214//!         Ok(json!({"result": 42}))
215//!     })
216//! );
217//!
218//! let mut client = Client::new(AgentOptions::builder()
219//!     .model("gpt-4")
220//!     .api_key("sk-...")
221//!     .tools(vec![calculator])
222//!     .build()?)?;
223//!
224//! client.send("Calculate 2+2").await?;
225//!
226//! while let Some(block) = client.receive().await? {
227//!     match block {
228//!         ContentBlock::ToolUse(tool_use) => {
229//!             println!("Model wants to use: {}", tool_use.name());
230//!
231//!             // Execute tool manually
232//!             let tool = client.get_tool(tool_use.name()).unwrap();
233//!             let result = tool.execute(tool_use.input().clone()).await?;
234//!
235//!             // Add result and continue
236//!             client.add_tool_result(tool_use.id(), result)?;
237//!             client.send("").await?;
238//!         }
239//!         ContentBlock::Text(text) => {
240//!             println!("Response: {}", text.text);
241//!         }
242//!         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
243//!     }
244//! }
245//! # Ok(())
246//! # }
247//! ```
248//!
249//! ## Automatic Tool Execution
250//!
251//! ```rust,no_run
252//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
253//! use serde_json::json;
254//!
255//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256//! let calculator = Tool::new(
257//!     "calculator",
258//!     "Performs arithmetic operations",
259//!     json!({"type": "object"}),
260//!     |input| Box::pin(async move { Ok(json!({"result": 42})) })
261//! );
262//!
263//! let mut client = Client::new(AgentOptions::builder()
264//!     .model("gpt-4")
265//!     .api_key("sk-...")
266//!     .tools(vec![calculator])
267//!     .auto_execute_tools(true)  // Enable auto-execution
268//!     .max_tool_iterations(5)    // Max 5 tool rounds
269//!     .build()?)?;
270//!
271//! client.send("Calculate 2+2 and then multiply by 3").await?;
272//!
273//! // Tools are executed automatically - you only get final text response
274//! while let Some(block) = client.receive().await? {
275//!     if let ContentBlock::Text(text) = block {
276//!         println!("{}", text.text);
277//!     }
278//! }
279//! # Ok(())
280//! # }
281//! ```
282//!
283//! ## With Hooks
284//!
285//! ```ignore
286//! use open_agent::{Client, AgentOptions, Hooks, HookDecision};
287//!
288//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
289//! let hooks = Hooks::new()
290//!     .add_user_prompt_submit(|event| async move {
291//!         // Block prompts containing certain words
292//!         if event.prompt.contains("forbidden") {
293//!             return Some(HookDecision::block("Forbidden word detected"));
294//!         }
295//!         Some(HookDecision::continue_())
296//!     })
297//!     .add_pre_tool_use(|event| async move {
298//!         // Log all tool uses
299//!         println!("Executing tool: {}", event.tool_name);
300//!         Some(HookDecision::continue_())
301//!     });
302//!
303//! let mut client = Client::new(AgentOptions::builder()
304//!     .model("gpt-4")
305//!     .base_url("http://localhost:1234/v1")
306//!     .hooks(hooks)
307//!     .build()?)?;
308//!
309//! // Hooks will be executed automatically
310//! client.send("Hello!").await?;
311//! # Ok(())
312//! # }
313//! ```
314
315use crate::types::{
316    AgentOptions, ContentBlock, Message, MessageRole, OpenAIContent, OpenAIContentPart,
317    OpenAIFunction, OpenAIMessage, OpenAIRequest, OpenAIToolCall, TextBlock,
318};
319use crate::utils::{ToolCallAggregator, parse_sse_stream};
320use crate::{Error, Result};
321use futures::stream::{Stream, StreamExt};
322use std::pin::Pin;
323use std::sync::Arc;
324use std::sync::atomic::{AtomicBool, Ordering};
325use std::time::Duration;
326
327/// A pinned, boxed stream of content blocks from the model.
328///
329/// This type alias represents an asynchronous stream that yields `ContentBlock` items.
330/// Each item is wrapped in a `Result` to handle potential errors during streaming.
331///
332/// The stream is:
333/// - **Pinned** (`Pin<Box<...>>`): Required for safe async operations and self-referential types
334/// - **Boxed**: Allows dynamic dispatch and hides the concrete stream implementation
335/// - **Send**: Can be safely transferred between threads
336///
337/// # Content Blocks
338///
339/// The stream can yield several types of content blocks:
340///
341/// - **TextBlock**: Incremental text responses from the model
342/// - **ToolUseBlock**: Requests to execute a tool with specific parameters
343/// - **ToolResultBlock**: Results from tool execution (in manual mode)
344///
345/// # Error Handling
346///
347/// Errors in the stream indicate issues like:
348/// - Network failures or timeouts
349/// - Malformed SSE events
350/// - JSON parsing errors
351/// - API errors from the model provider
352///
353/// When an error occurs, the stream typically terminates. It's the caller's responsibility
354/// to handle errors appropriately.
355///
356/// # Examples
357///
358/// ```rust,no_run
359/// use open_agent::{query, AgentOptions, ContentBlock};
360/// use futures::StreamExt;
361///
362/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
363/// let options = AgentOptions::builder()
364///     .model("gpt-4")
365///     .api_key("sk-...")
366///     .build()?;
367///
368/// let mut stream = query("Hello!", &options).await?;
369///
370/// while let Some(result) = stream.next().await {
371///     match result {
372///         Ok(ContentBlock::Text(text)) => print!("{}", text.text),
373///         Ok(_) => {}, // Other block types
374///         Err(e) => eprintln!("Stream error: {}", e),
375///     }
376/// }
377/// # Ok(())
378/// # }
379/// ```
380pub type ContentStream = Pin<Box<dyn Stream<Item = Result<ContentBlock>> + Send>>;
381
382/// Simple query function for single-turn interactions without conversation history.
383///
384/// This is a stateless convenience function for simple queries that don't require
385/// multi-turn conversations. It creates a temporary HTTP client, sends a single
386/// prompt, and returns a stream of content blocks.
387///
388/// For multi-turn conversations or more control over the interaction, use [`Client`] instead.
389///
390/// # Parameters
391///
392/// - `prompt`: The user's message to send to the model
393/// - `options`: Configuration including model, API key, tools, etc.
394///
395/// # Returns
396///
397/// Returns a `ContentStream` that yields content blocks as they arrive from the model.
398/// The stream must be polled to completion to receive all blocks.
399///
400/// # Behavior
401///
402/// 1. Creates a temporary HTTP client with configured timeout
403/// 2. Builds message array (system prompt + user prompt)
404/// 3. Converts tools to OpenAI format if provided
405/// 4. Makes HTTP POST request to `/chat/completions`
406/// 5. Parses Server-Sent Events (SSE) response stream
407/// 6. Aggregates chunks into complete content blocks
408/// 7. Returns stream that yields blocks as they complete
409///
410/// # Error Handling
411///
412/// This function can return errors for:
413/// - HTTP client creation failures
414/// - Network errors during the request
415/// - API errors (authentication, invalid model, rate limits, etc.)
416/// - SSE parsing errors
417/// - JSON deserialization errors
418///
419/// # Performance Notes
420///
421/// - Creates a new HTTP client for each call (consider using `Client` for repeated queries)
422/// - Timeout is configurable via `AgentOptions::timeout` (default: 120 seconds)
423/// - Streaming begins immediately; no buffering of the full response
424///
425/// # Examples
426///
427/// ## Basic Usage
428///
429/// ```rust,no_run
430/// use open_agent::{query, AgentOptions};
431/// use futures::StreamExt;
432///
433/// #[tokio::main]
434/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
435///     let options = AgentOptions::builder()
436///         .system_prompt("You are a helpful assistant")
437///         .model("gpt-4")
438///         .api_key("sk-...")
439///         .build()?;
440///
441///     let mut stream = query("What's the capital of France?", &options).await?;
442///
443///     while let Some(block) = stream.next().await {
444///         match block? {
445///             open_agent::ContentBlock::Text(text) => {
446///                 print!("{}", text.text);
447///             }
448///             open_agent::ContentBlock::ToolUse(_)
449///             | open_agent::ContentBlock::ToolResult(_)
450///             | open_agent::ContentBlock::Image(_) => {}
451///         }
452///     }
453///
454///     Ok(())
455/// }
456/// ```
457///
458/// ## With Tools
459///
460/// ```rust,no_run
461/// use open_agent::{query, AgentOptions, Tool, ContentBlock};
462/// use futures::StreamExt;
463/// use serde_json::json;
464///
465/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
466/// let calculator = Tool::new(
467///     "calculator",
468///     "Performs calculations",
469///     json!({"type": "object"}),
470///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
471/// );
472///
473/// let options = AgentOptions::builder()
474///     .model("gpt-4")
475///     .api_key("sk-...")
476///     .tools(vec![calculator])
477///     .build()?;
478///
479/// let mut stream = query("Calculate 2+2", &options).await?;
480///
481/// while let Some(block) = stream.next().await {
482///     match block? {
483///         ContentBlock::ToolUse(tool_use) => {
484///             println!("Model wants to use: {}", tool_use.name());
485///             // Note: You'll need to manually execute tools and continue
486///             // the conversation. For automatic execution, use Client.
487///         }
488///         ContentBlock::Text(text) => print!("{}", text.text),
489///         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
490///     }
491/// }
492/// # Ok(())
493/// # }
494/// ```
495///
496/// ## Error Handling
497///
498/// ```rust,no_run
499/// use open_agent::{query, AgentOptions};
500/// use futures::StreamExt;
501///
502/// # async fn example() {
503/// let options = AgentOptions::builder()
504///     .model("gpt-4")
505///     .api_key("invalid-key")
506///     .build()
507///     .unwrap();
508///
509/// match query("Hello", &options).await {
510///     Ok(mut stream) => {
511///         while let Some(result) = stream.next().await {
512///             match result {
513///                 Ok(block) => println!("Block: {:?}", block),
514///                 Err(e) => {
515///                     eprintln!("Stream error: {}", e);
516///                     break;
517///                 }
518///             }
519///         }
520///     }
521///     Err(e) => eprintln!("Query failed: {}", e),
522/// }
523/// # }
524/// ```
525pub async fn query(prompt: &str, options: &AgentOptions) -> Result<ContentStream> {
526    // Create HTTP client with configured timeout
527    // The timeout applies to the entire request, not individual chunks
528    let client = reqwest::Client::builder()
529        .timeout(Duration::from_secs(options.timeout()))
530        .build()
531        .map_err(Error::Http)?;
532
533    // Build messages array for the API request
534    // OpenAI format expects an array of message objects with role and content
535    let mut messages = Vec::new();
536
537    // Add system prompt if provided
538    // System prompts set the assistant's behavior and context
539    if !options.system_prompt().is_empty() {
540        messages.push(OpenAIMessage {
541            role: "system".to_string(),
542            content: Some(OpenAIContent::Text(options.system_prompt().to_string())),
543            tool_calls: None,
544            tool_call_id: None,
545        });
546    }
547
548    // Add user prompt
549    // This is the actual query from the user
550    messages.push(OpenAIMessage {
551        role: "user".to_string(),
552        content: Some(OpenAIContent::Text(prompt.to_string())),
553        tool_calls: None,
554        tool_call_id: None,
555    });
556
557    // Convert tools to OpenAI format if any are provided
558    // Tools are described using JSON Schema for parameter validation
559    let tools = if !options.tools().is_empty() {
560        Some(
561            options
562                .tools()
563                .iter()
564                .map(|t| t.to_openai_format())
565                .collect(),
566        )
567    } else {
568        None
569    };
570
571    // Build the OpenAI-compatible request payload
572    // stream=true enables Server-Sent Events for incremental responses
573    let request = OpenAIRequest {
574        model: options.model().to_string(),
575        messages,
576        stream: true, // Critical: enables SSE streaming
577        max_tokens: options.max_tokens(),
578        temperature: Some(options.temperature()),
579        tools,
580    };
581
582    // Make HTTP POST request to the chat completions endpoint
583    let url = format!("{}/chat/completions", options.base_url());
584    let response = client
585        .post(&url)
586        .header("Authorization", format!("Bearer {}", options.api_key()))
587        .header("Content-Type", "application/json")
588        .json(&request)
589        .send()
590        .await
591        .map_err(Error::Http)?;
592
593    // Check for HTTP-level errors before processing the stream
594    // This catches authentication failures, rate limits, invalid models, etc.
595    if !response.status().is_success() {
596        let status = response.status();
597        let body = response.text().await.unwrap_or_else(|e| {
598            eprintln!("WARNING: Failed to read error response body: {}", e);
599            "Unknown error (failed to read response body)".to_string()
600        });
601        return Err(Error::api(format!("API error {}: {}", status, body)));
602    }
603
604    // Parse the Server-Sent Events (SSE) stream
605    // The response body is a stream of "data: {...}" events
606    let sse_stream = parse_sse_stream(response);
607
608    // Aggregate SSE chunks into complete content blocks
609    // ToolCallAggregator handles partial JSON and assembles complete tool calls
610    // The scan() combinator maintains state across stream items
611    let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
612        let result = match chunk_result {
613            Ok(chunk) => match aggregator.process_chunk(chunk) {
614                Ok(blocks) => {
615                    if blocks.is_empty() {
616                        Some(None) // Intermediate chunk, continue streaming
617                    } else {
618                        Some(Some(Ok(blocks))) // Complete block(s) ready
619                    }
620                }
621                Err(e) => Some(Some(Err(e))), // Propagate processing error
622            },
623            Err(e) => Some(Some(Err(e))), // Propagate stream error
624        };
625        futures::future::ready(result)
626    });
627
628    // Flatten the stream to emit individual blocks
629    // filter_map removes None values (incomplete chunks)
630    // flat_map expands Vec<ContentBlock> into individual items
631    let flattened = stream
632        .filter_map(|item| async move { item })
633        .flat_map(|result| {
634            futures::stream::iter(match result {
635                Ok(blocks) => blocks.into_iter().map(Ok).collect(),
636                Err(e) => vec![Err(e)],
637            })
638        });
639
640    // Pin and box the stream for type erasure and safe async usage
641    Ok(Box::pin(flattened))
642}
643
644/// Stateful client for multi-turn conversations with automatic history management.
645///
646/// The `Client` is the primary interface for building conversational AI applications.
647/// It maintains conversation history, manages streaming responses, and provides two
648/// modes of operation: manual and automatic tool execution.
649///
650/// # State Management
651///
652/// The client maintains several pieces of state that persist across multiple turns:
653///
654/// - **Conversation History**: Complete record of all messages exchanged
655/// - **Active Stream**: Currently active SSE stream being consumed
656/// - **Interrupt Flag**: Thread-safe cancellation signal
657/// - **Auto-Execution Buffer**: Cached blocks for auto-execution mode
658///
659/// # Operating Modes
660///
661/// ## Manual Mode (default)
662///
663/// In manual mode, the client streams blocks directly to the caller. When the model
664/// requests a tool, you receive a `ToolUseBlock`, execute the tool yourself, add the
665/// result with `add_tool_result()`, and continue the conversation.
666///
667/// **Advantages**:
668/// - Full control over tool execution
669/// - Custom error handling per tool
670/// - Ability to modify tool inputs/outputs
671/// - Interactive debugging capabilities
672///
673/// ## Automatic Mode (`auto_execute_tools = true`)
674///
675/// In automatic mode, the client executes tools transparently and only returns the
676/// final text response after all tool iterations complete.
677///
678/// **Advantages**:
679/// - Simpler API for common use cases
680/// - Built-in retry logic via hooks
681/// - Automatic conversation continuation
682/// - Configurable iteration limits
683///
684/// # Thread Safety
685///
686/// The client is NOT thread-safe for concurrent use. However, the interrupt mechanism
687/// uses `Arc<AtomicBool>` which can be safely shared across threads to signal cancellation.
688///
689/// # Memory Management
690///
691/// - History grows unbounded by default (consider clearing periodically)
692/// - Streams are consumed lazily (low memory footprint during streaming)
693/// - Auto-execution buffers entire response (higher memory in auto mode)
694///
695/// # Examples
696///
697/// ## Basic Multi-Turn Conversation
698///
699/// ```rust,no_run
700/// use open_agent::{Client, AgentOptions, ContentBlock};
701///
702/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
703/// let mut client = Client::new(AgentOptions::builder()
704///     .model("gpt-4")
705///     .api_key("sk-...")
706///     .build()?)?;
707///
708/// // First question
709/// client.send("What's the capital of France?").await?;
710/// while let Some(block) = client.receive().await? {
711///     if let ContentBlock::Text(text) = block {
712///         println!("{}", text.text); // "Paris is the capital of France."
713///     }
714/// }
715///
716/// // Follow-up question - history is automatically maintained
717/// client.send("What's its population?").await?;
718/// while let Some(block) = client.receive().await? {
719///     if let ContentBlock::Text(text) = block {
720///         println!("{}", text.text); // "Paris has approximately 2.2 million people."
721///     }
722/// }
723/// # Ok(())
724/// # }
725/// ```
726///
727/// ## Manual Tool Execution
728///
729/// ```rust,no_run
730/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
731/// use serde_json::json;
732///
733/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
734/// let calculator = Tool::new(
735///     "calculator",
736///     "Performs arithmetic",
737///     json!({"type": "object"}),
738///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
739/// );
740///
741/// let mut client = Client::new(AgentOptions::builder()
742///     .model("gpt-4")
743///     .api_key("sk-...")
744///     .tools(vec![calculator])
745///     .build()?)?;
746///
747/// client.send("What's 2+2?").await?;
748///
749/// while let Some(block) = client.receive().await? {
750///     match block {
751///         ContentBlock::ToolUse(tool_use) => {
752///             // Execute tool manually
753///             let result = json!({"result": 4});
754///             client.add_tool_result(tool_use.id(), result)?;
755///
756///             // Continue conversation to get model's response
757///             client.send("").await?;
758///         }
759///         ContentBlock::Text(text) => {
760///             println!("{}", text.text); // "The result is 4."
761///         }
762///         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
763///     }
764/// }
765/// # Ok(())
766/// # }
767/// ```
768///
769/// ## Automatic Tool Execution
770///
771/// ```rust,no_run
772/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
773/// use serde_json::json;
774///
775/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
776/// let calculator = Tool::new(
777///     "calculator",
778///     "Performs arithmetic",
779///     json!({"type": "object"}),
780///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
781/// );
782///
783/// let mut client = Client::new(AgentOptions::builder()
784///     .model("gpt-4")
785///     .api_key("sk-...")
786///     .tools(vec![calculator])
787///     .auto_execute_tools(true)  // Enable auto-execution
788///     .build()?)?;
789///
790/// client.send("What's 2+2?").await?;
791///
792/// // Tools execute automatically - you only receive final text
793/// while let Some(block) = client.receive().await? {
794///     if let ContentBlock::Text(text) = block {
795///         println!("{}", text.text); // "The result is 4."
796///     }
797/// }
798/// # Ok(())
799/// # }
800/// ```
801///
802/// ## With Interruption
803///
804/// ```rust,no_run
805/// use open_agent::{Client, AgentOptions};
806/// use std::time::Duration;
807///
808/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
809/// let mut client = Client::new(AgentOptions::default())?;
810///
811/// // Start a long-running query
812/// client.send("Write a very long story").await?;
813///
814/// // Spawn a task to interrupt after timeout
815/// let interrupt_handle = client.interrupt_handle();
816/// tokio::spawn(async move {
817///     tokio::time::sleep(Duration::from_secs(5)).await;
818///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
819/// });
820///
821/// // This loop will stop when interrupted
822/// while let Some(block) = client.receive().await? {
823///     // Process blocks...
824/// }
825///
826/// // Client is still usable after interruption
827/// client.send("What's 2+2?").await?;
828/// # Ok(())
829/// # }
830/// ```
831pub struct Client {
832    /// Configuration options including model, API key, tools, hooks, etc.
833    ///
834    /// This field contains all the settings that control how the client behaves.
835    /// It's set once during construction and cannot be modified (though you can
836    /// access it via `options()` for inspection).
837    options: AgentOptions,
838
839    /// Complete conversation history as a sequence of messages.
840    ///
841    /// Each message contains a role (System/User/Assistant/Tool) and content blocks.
842    /// History grows unbounded by default - use `clear_history()` to reset.
843    ///
844    /// **Important**: The history includes ALL messages, not just user/assistant.
845    /// This includes tool results and intermediate assistant messages from tool calls.
846    history: Vec<Message>,
847
848    /// Currently active SSE stream being consumed.
849    ///
850    /// This is `Some(stream)` while a response is being received, and `None` when
851    /// no request is in flight or after a response completes.
852    ///
853    /// The stream is set by `send()` and consumed by `receive()`. When the stream
854    /// is exhausted, `receive()` returns `Ok(None)` and sets this back to `None`.
855    current_stream: Option<ContentStream>,
856
857    /// Reusable HTTP client for making API requests.
858    ///
859    /// Configured once during construction with the timeout from `AgentOptions`.
860    /// Reusing the same client across requests enables connection pooling and
861    /// better performance for multi-turn conversations.
862    http_client: reqwest::Client,
863
864    /// Thread-safe interrupt flag for cancellation.
865    ///
866    /// This `Arc<AtomicBool>` can be cloned and shared across threads or async tasks
867    /// to signal cancellation. When set to `true`, the next `receive()` call will
868    /// return `Ok(None)` and clear the current stream.
869    ///
870    /// The flag is automatically reset to `false` at the start of each `send()` call.
871    ///
872    /// **Thread Safety**: Can be safely accessed from multiple threads using atomic
873    /// operations. However, only one thread should call `send()`/`receive()`.
874    interrupted: Arc<AtomicBool>,
875
876    /// Buffer of content blocks for auto-execution mode.
877    ///
878    /// When `auto_execute_tools` is enabled, `receive()` internally calls the
879    /// auto-execution loop which buffers all final text blocks here. Subsequent
880    /// calls to `receive()` return blocks from this buffer one at a time.
881    ///
882    /// **Only used when `options.auto_execute_tools == true`**.
883    ///
884    /// The buffer is cleared when starting a new auto-execution loop.
885    auto_exec_buffer: Vec<ContentBlock>,
886
887    /// Current read position in the auto-execution buffer.
888    ///
889    /// Tracks which block to return next when `receive()` is called in auto mode.
890    /// Reset to 0 when the buffer is refilled with a new response.
891    ///
892    /// **Only used when `options.auto_execute_tools == true`**.
893    auto_exec_index: usize,
894}
895
896impl Client {
897    /// Creates a new client with the specified configuration.
898    ///
899    /// This constructor initializes all state fields and creates a reusable HTTP client
900    /// configured with the timeout from `AgentOptions`.
901    ///
902    /// # Parameters
903    ///
904    /// - `options`: Configuration including model, API key, tools, hooks, etc.
905    ///
906    /// # Errors
907    ///
908    /// Returns an error if the HTTP client cannot be built. This can happen due to:
909    /// - Invalid TLS configuration
910    /// - System resource exhaustion
911    /// - Invalid timeout values
912    ///
913    /// # Examples
914    ///
915    /// ```rust
916    /// use open_agent::{Client, AgentOptions};
917    ///
918    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
919    /// let client = Client::new(AgentOptions::builder()
920    ///     .model("gpt-4")
921    ///     .base_url("http://localhost:1234/v1")
922    ///     .build()?)?;
923    /// # Ok(())
924    /// # }
925    /// ```
926    pub fn new(options: AgentOptions) -> Result<Self> {
927        // Build HTTP client with configured timeout
928        // This client is reused across all requests for connection pooling
929        let http_client = reqwest::Client::builder()
930            .timeout(Duration::from_secs(options.timeout()))
931            .build()
932            .map_err(|e| Error::config(format!("Failed to build HTTP client: {}", e)))?;
933
934        Ok(Self {
935            options,
936            history: Vec::new(),  // Empty conversation history
937            current_stream: None, // No active stream yet
938            http_client,
939            interrupted: Arc::new(AtomicBool::new(false)), // Not interrupted initially
940            auto_exec_buffer: Vec::new(),                  // Empty buffer for auto mode
941            auto_exec_index: 0,                            // Start at beginning of buffer
942        })
943    }
944
945    /// Sends a user message and initiates streaming of the model's response.
946    ///
947    /// This method performs several critical steps:
948    ///
949    /// 1. Executes UserPromptSubmit hooks (which can modify or block the prompt)
950    /// 2. Adds the user message to conversation history
951    /// 3. Builds and sends HTTP request to the OpenAI-compatible API
952    /// 4. Parses the SSE stream and sets up aggregation
953    /// 5. Stores the stream for consumption via `receive()`
954    ///
955    /// # Parameters
956    ///
957    /// - `prompt`: The user's message. Can be empty to continue conversation after
958    ///   adding tool results (common pattern in manual tool execution mode).
959    ///
960    /// # Returns
961    ///
962    /// - `Ok(())`: Request sent successfully, call `receive()` to get blocks
963    /// - `Err(e)`: Request failed (network error, API error, hook blocked, etc.)
964    ///
965    /// # Behavior Details
966    ///
967    /// ## Hook Execution
968    ///
969    /// Before sending, UserPromptSubmit hooks are executed. Hooks can:
970    /// - Modify the prompt text
971    /// - Block the request entirely
972    /// - Access conversation history
973    ///
974    /// If a hook blocks the request, this method returns an error immediately.
975    ///
976    /// ## History Management
977    ///
978    /// The prompt is added to history BEFORE sending the request. This ensures
979    /// that history is consistent even if the request fails.
980    ///
981    /// ## Stream Setup
982    ///
983    /// The response stream is set up but not consumed. You must call `receive()`
984    /// repeatedly to get content blocks. The stream remains active until:
985    /// - All blocks are consumed (stream naturally ends)
986    /// - An error occurs
987    /// - Interrupt is triggered
988    ///
989    /// ## Interrupt Handling
990    ///
991    /// The interrupt flag is reset to `false` at the start of this method,
992    /// allowing a fresh request after a previous interruption.
993    ///
994    /// # State Changes
995    ///
996    /// - Resets `interrupted` flag to `false`
997    /// - Appends user message to `history`
998    /// - Sets `current_stream` to new SSE stream
999    /// - Does NOT modify `auto_exec_buffer` or `auto_exec_index`
1000    ///
1001    /// # Errors
1002    ///
1003    /// Returns errors for:
1004    /// - Hook blocking the prompt
1005    /// - HTTP client errors (network failure, DNS, etc.)
1006    /// - API errors (auth failure, invalid model, rate limits)
1007    /// - Invalid response format
1008    ///
1009    /// After an error, the client remains usable for new requests.
1010    ///
1011    /// # Examples
1012    ///
1013    /// ## Basic Usage
1014    ///
1015    /// ```rust,no_run
1016    /// # use open_agent::{Client, AgentOptions};
1017    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1018    /// # let mut client = Client::new(AgentOptions::default())?;
1019    /// client.send("Hello!").await?;
1020    ///
1021    /// while let Some(block) = client.receive().await? {
1022    ///     // Process blocks...
1023    /// }
1024    /// # Ok(())
1025    /// # }
1026    /// ```
1027    ///
1028    /// ## Continuing After Tool Result
1029    ///
1030    /// ```rust,no_run
1031    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1032    /// # use serde_json::json;
1033    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1034    /// # let mut client = Client::new(AgentOptions::default())?;
1035    /// client.send("Use the calculator").await?;
1036    ///
1037    /// while let Some(block) = client.receive().await? {
1038    ///     if let ContentBlock::ToolUse(tool_use) = block {
1039    ///         // Execute tool and add result
1040    ///         client.add_tool_result(tool_use.id(), json!({"result": 42}))?;
1041    ///
1042    ///         // Continue conversation with empty prompt
1043    ///         client.send("").await?;
1044    ///     }
1045    /// }
1046    /// # Ok(())
1047    /// # }
1048    /// ```
1049    pub async fn send(&mut self, prompt: &str) -> Result<()> {
1050        use crate::hooks::UserPromptSubmitEvent;
1051
1052        // Reset interrupt flag for new query
1053        // This allows the client to be reused after a previous interruption
1054        // Uses SeqCst ordering to ensure visibility across all threads
1055        self.interrupted.store(false, Ordering::SeqCst);
1056
1057        // Execute UserPromptSubmit hooks
1058        // Hooks run BEFORE adding to history, allowing modification or blocking
1059        let mut final_prompt = prompt.to_string();
1060        let history_snapshot: Vec<serde_json::Value> = self
1061            .history
1062            .iter()
1063            .map(|_| serde_json::json!({})) // Simplified snapshot for hooks
1064            .collect();
1065
1066        // Create hook event with current prompt and history
1067        let event = UserPromptSubmitEvent::new(final_prompt.clone(), history_snapshot);
1068
1069        // Execute all registered UserPromptSubmit hooks
1070        if let Some(decision) = self.options.hooks().execute_user_prompt_submit(event).await {
1071            // Check if hook wants to block execution
1072            if !decision.continue_execution() {
1073                return Err(Error::other(format!(
1074                    "Prompt blocked by hook: {}",
1075                    decision.reason().unwrap_or("")
1076                )));
1077            }
1078            // Apply any prompt modifications from hooks
1079            if let Some(modified) = decision.modified_prompt() {
1080                final_prompt = modified.to_string();
1081            }
1082        }
1083
1084        // Add user message to history BEFORE sending request
1085        // This ensures history consistency even if request fails
1086        // Empty prompts are still added (needed for tool continuation)
1087        self.history.push(Message::user(final_prompt));
1088
1089        // Build messages array for API request
1090        // This includes system prompt + full conversation history
1091        let mut messages = Vec::new();
1092
1093        // Add system prompt as first message if configured
1094        // System prompts are added fresh for each request (not from history)
1095        if !self.options.system_prompt().is_empty() {
1096            messages.push(OpenAIMessage {
1097                role: "system".to_string(),
1098                content: Some(OpenAIContent::Text(
1099                    self.options.system_prompt().to_string(),
1100                )),
1101                tool_calls: None,
1102                tool_call_id: None,
1103            });
1104        }
1105
1106        // Convert conversation history to OpenAI message format
1107        // This includes user prompts, assistant responses, and tool results
1108        for msg in &self.history {
1109            // Separate blocks by type to determine message structure
1110            let mut text_blocks = Vec::new();
1111            let mut image_blocks = Vec::new();
1112            let mut tool_use_blocks = Vec::new();
1113            let mut tool_result_blocks = Vec::new();
1114
1115            for block in &msg.content {
1116                match block {
1117                    ContentBlock::Text(text) => text_blocks.push(text),
1118                    ContentBlock::Image(image) => image_blocks.push(image),
1119                    ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
1120                    ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
1121                }
1122            }
1123
1124            // Handle different message types based on content blocks
1125            // Case 1: Message contains tool results (should be separate tool messages)
1126            if !tool_result_blocks.is_empty() {
1127                for tool_result in tool_result_blocks {
1128                    // Serialize the tool result content as JSON string
1129                    let content =
1130                        serde_json::to_string(tool_result.content()).unwrap_or_else(|e| {
1131                            format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
1132                        });
1133
1134                    messages.push(OpenAIMessage {
1135                        role: "tool".to_string(),
1136                        content: Some(OpenAIContent::Text(content)),
1137                        tool_calls: None,
1138                        tool_call_id: Some(tool_result.tool_use_id().to_string()),
1139                    });
1140                }
1141            }
1142            // Case 2: Message contains tool use blocks (assistant with tool calls)
1143            else if !tool_use_blocks.is_empty() {
1144                // Build tool_calls array
1145                let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
1146                    .iter()
1147                    .map(|tool_use| {
1148                        // Serialize the input as a JSON string (OpenAI API requirement)
1149                        let arguments = serde_json::to_string(tool_use.input())
1150                            .unwrap_or_else(|_| "{}".to_string());
1151
1152                        OpenAIToolCall {
1153                            id: tool_use.id().to_string(),
1154                            call_type: "function".to_string(),
1155                            function: OpenAIFunction {
1156                                name: tool_use.name().to_string(),
1157                                arguments,
1158                            },
1159                        }
1160                    })
1161                    .collect();
1162
1163                // Extract any text content (some models include reasoning before tool calls)
1164                // Note: OpenAI API requires content field even if empty when tool_calls present
1165                let content = if !text_blocks.is_empty() {
1166                    let text = text_blocks
1167                        .iter()
1168                        .map(|t| t.text.as_str())
1169                        .collect::<Vec<_>>()
1170                        .join("\n");
1171                    Some(OpenAIContent::Text(text))
1172                } else {
1173                    // Empty string satisfies OpenAI API schema (content is required)
1174                    Some(OpenAIContent::Text(String::new()))
1175                };
1176
1177                messages.push(OpenAIMessage {
1178                    role: "assistant".to_string(),
1179                    content,
1180                    tool_calls: Some(tool_calls),
1181                    tool_call_id: None,
1182                });
1183            }
1184            // Case 3: Message contains images (use OpenAIContent::Parts)
1185            else if !image_blocks.is_empty() {
1186                // Log debug info about images being serialized
1187                log::debug!(
1188                    "Serializing message with {} image(s) for {:?} role",
1189                    image_blocks.len(),
1190                    msg.role
1191                );
1192
1193                // Build content parts array preserving original order
1194                let mut content_parts = Vec::new();
1195
1196                // Re-iterate through content blocks to maintain order
1197                for block in &msg.content {
1198                    match block {
1199                        ContentBlock::Text(text) => {
1200                            content_parts.push(OpenAIContentPart::text(&text.text));
1201                        }
1202                        ContentBlock::Image(image) => {
1203                            // Log image details (truncate URL for privacy)
1204                            let url_display = if image.url().len() > 100 {
1205                                format!("{}... ({} chars)", &image.url()[..100], image.url().len())
1206                            } else {
1207                                image.url().to_string()
1208                            };
1209                            let detail_str = match image.detail() {
1210                                crate::types::ImageDetail::Low => "low",
1211                                crate::types::ImageDetail::High => "high",
1212                                crate::types::ImageDetail::Auto => "auto",
1213                            };
1214                            log::debug!("  - Image: {} (detail: {})", url_display, detail_str);
1215
1216                            content_parts.push(OpenAIContentPart::from_image(image));
1217                        }
1218                        ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
1219                    }
1220                }
1221
1222                // Defensive check: content_parts should never be empty at this point
1223                // If it is, it indicates a logic error (e.g., all blocks were filtered out)
1224                if content_parts.is_empty() {
1225                    return Err(Error::other(
1226                        "Internal error: Message with images produced empty content array",
1227                    ));
1228                }
1229
1230                let role_str = match msg.role {
1231                    MessageRole::System => "system",
1232                    MessageRole::User => "user",
1233                    MessageRole::Assistant => "assistant",
1234                    MessageRole::Tool => "tool",
1235                };
1236
1237                messages.push(OpenAIMessage {
1238                    role: role_str.to_string(),
1239                    content: Some(OpenAIContent::Parts(content_parts)),
1240                    tool_calls: None,
1241                    tool_call_id: None,
1242                });
1243            }
1244            // Case 4: Message contains only text (normal message, backward compatible)
1245            else {
1246                let content = text_blocks
1247                    .iter()
1248                    .map(|t| t.text.as_str())
1249                    .collect::<Vec<_>>()
1250                    .join("\n");
1251
1252                let role_str = match msg.role {
1253                    MessageRole::System => "system",
1254                    MessageRole::User => "user",
1255                    MessageRole::Assistant => "assistant",
1256                    MessageRole::Tool => "tool",
1257                };
1258
1259                messages.push(OpenAIMessage {
1260                    role: role_str.to_string(),
1261                    content: Some(OpenAIContent::Text(content)),
1262                    tool_calls: None,
1263                    tool_call_id: None,
1264                });
1265            }
1266        }
1267
1268        // Convert tools to OpenAI format if any are registered
1269        // Each tool is described with name, description, and JSON Schema parameters
1270        let tools = if !self.options.tools().is_empty() {
1271            Some(
1272                self.options
1273                    .tools()
1274                    .iter()
1275                    .map(|t| t.to_openai_format())
1276                    .collect(),
1277            )
1278        } else {
1279            None
1280        };
1281
1282        // Build the OpenAI-compatible request payload
1283        let request = OpenAIRequest {
1284            model: self.options.model().to_string(),
1285            messages,
1286            stream: true, // Always stream for progressive rendering
1287            max_tokens: self.options.max_tokens(),
1288            temperature: Some(self.options.temperature()),
1289            tools,
1290        };
1291
1292        // Make HTTP POST request to chat completions endpoint
1293        let url = format!("{}/chat/completions", self.options.base_url());
1294        let response = self
1295            .http_client
1296            .post(&url)
1297            .header(
1298                "Authorization",
1299                format!("Bearer {}", self.options.api_key()),
1300            )
1301            .header("Content-Type", "application/json")
1302            .json(&request)
1303            .send()
1304            .await
1305            .map_err(Error::Http)?;
1306
1307        // Check for HTTP-level errors before processing stream
1308        // This catches authentication, rate limits, invalid models, etc.
1309        if !response.status().is_success() {
1310            let status = response.status();
1311            let body = response.text().await.unwrap_or_else(|e| {
1312                eprintln!("WARNING: Failed to read error response body: {}", e);
1313                "Unknown error (failed to read response body)".to_string()
1314            });
1315            return Err(Error::api(format!("API error {}: {}", status, body)));
1316        }
1317
1318        // Parse Server-Sent Events (SSE) stream from response
1319        let sse_stream = parse_sse_stream(response);
1320
1321        // Aggregate SSE chunks into complete content blocks
1322        // ToolCallAggregator maintains state to handle incremental JSON chunks
1323        // that may arrive split across multiple SSE events
1324        let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
1325            let result = match chunk_result {
1326                Ok(chunk) => match aggregator.process_chunk(chunk) {
1327                    Ok(blocks) => {
1328                        if blocks.is_empty() {
1329                            Some(None) // Partial chunk, keep aggregating
1330                        } else {
1331                            Some(Some(Ok(blocks))) // Complete block(s) ready
1332                        }
1333                    }
1334                    Err(e) => Some(Some(Err(e))), // Processing error
1335                },
1336                Err(e) => Some(Some(Err(e))), // Stream error
1337            };
1338            futures::future::ready(result)
1339        });
1340
1341        // Flatten the stream to emit individual blocks
1342        // filter_map removes None values (partial chunks)
1343        // flat_map converts Vec<ContentBlock> to individual items
1344        let flattened = stream
1345            .filter_map(|item| async move { item })
1346            .flat_map(|result| {
1347                futures::stream::iter(match result {
1348                    Ok(blocks) => blocks.into_iter().map(Ok).collect(),
1349                    Err(e) => vec![Err(e)],
1350                })
1351            });
1352
1353        // Store the stream for consumption via receive()
1354        // The stream is NOT consumed here - that happens in receive()
1355        self.current_stream = Some(Box::pin(flattened));
1356
1357        Ok(())
1358    }
1359
1360    /// Internal method that returns one block from the current stream.
1361    ///
1362    /// This is the core streaming logic extracted for reuse by both manual mode
1363    /// and auto-execution mode. It handles interrupt checking and stream consumption.
1364    ///
1365    /// # Returns
1366    ///
1367    /// - `Ok(Some(block))`: Successfully received a content block
1368    /// - `Ok(None)`: Stream ended naturally or was interrupted
1369    /// - `Err(e)`: An error occurred during streaming
1370    ///
1371    /// # State Changes
1372    ///
1373    /// - Sets `current_stream` to `None` if interrupted or stream ends
1374    /// - Does not modify history or other state
1375    ///
1376    /// # Implementation Notes
1377    ///
1378    /// This method checks the interrupt flag on every call, allowing responsive
1379    /// cancellation. The check uses SeqCst ordering for immediate visibility of
1380    /// interrupts from other threads.
1381    async fn receive_one(&mut self) -> Result<Option<ContentBlock>> {
1382        // Check interrupt flag before attempting to receive
1383        // Uses SeqCst to ensure we see the latest value from any thread
1384        if self.interrupted.load(Ordering::SeqCst) {
1385            // Clear the stream and return None to signal completion
1386            self.current_stream = None;
1387            return Ok(None);
1388        }
1389
1390        // Poll the current stream if one exists
1391        if let Some(stream) = &mut self.current_stream {
1392            match stream.next().await {
1393                Some(Ok(block)) => Ok(Some(block)), // Got a block
1394                Some(Err(e)) => Err(e),             // Stream error
1395                None => Ok(None),                   // Stream ended
1396            }
1397        } else {
1398            // No active stream
1399            Ok(None)
1400        }
1401    }
1402
1403    /// Collects all blocks from the current stream into a vector.
1404    ///
1405    /// Internal helper for auto-execution mode. This method buffers the entire
1406    /// response in memory, which is necessary to determine if the response contains
1407    /// tool calls before returning anything to the caller.
1408    ///
1409    /// # Returns
1410    ///
1411    /// - `Ok(vec)`: Successfully collected all blocks
1412    /// - `Err(e)`: Error during collection or interrupted
1413    ///
1414    /// # Memory Usage
1415    ///
1416    /// This buffers the entire response, which can be large for long completions.
1417    /// Consider the memory implications when using auto-execution mode.
1418    ///
1419    /// # Interruption
1420    ///
1421    /// Checks interrupt flag during collection and returns error if interrupted.
1422    async fn collect_all_blocks(&mut self) -> Result<Vec<ContentBlock>> {
1423        let mut blocks = Vec::new();
1424
1425        // Consume entire stream into vector
1426        while let Some(block) = self.receive_one().await? {
1427            // Check interrupt during collection for responsiveness
1428            if self.interrupted.load(Ordering::SeqCst) {
1429                self.current_stream = None;
1430                return Err(Error::other(
1431                    "Operation interrupted during block collection",
1432                ));
1433            }
1434
1435            blocks.push(block);
1436        }
1437
1438        Ok(blocks)
1439    }
1440
1441    /// Executes a tool by name with the given input.
1442    ///
1443    /// Internal helper for auto-execution mode. Looks up the tool in the registered
1444    /// tools list and executes it with the provided input.
1445    ///
1446    /// # Parameters
1447    ///
1448    /// - `tool_name`: Name of the tool to execute
1449    /// - `input`: JSON value containing tool parameters
1450    ///
1451    /// # Returns
1452    ///
1453    /// - `Ok(result)`: Tool executed successfully, returns result as JSON
1454    /// - `Err(e)`: Tool not found or execution failed
1455    ///
1456    /// # Error Handling
1457    ///
1458    /// If the tool is not found in the registry, returns a ToolError.
1459    /// If execution fails, the error from the tool is propagated.
1460    async fn execute_tool_internal(
1461        &self,
1462        tool_name: &str,
1463        input: serde_json::Value,
1464    ) -> Result<serde_json::Value> {
1465        // Find tool in registered tools by name
1466        let tool = self
1467            .options
1468            .tools()
1469            .iter()
1470            .find(|t| t.name() == tool_name)
1471            .ok_or_else(|| Error::tool(format!("Tool '{}' not found", tool_name)))?;
1472
1473        // Execute the tool's async function
1474        tool.execute(input).await
1475    }
1476
1477    /// Auto-execution loop that handles tool calls automatically.
1478    ///
1479    /// This is the core implementation of automatic tool execution mode. It:
1480    ///
1481    /// 1. Collects all blocks from the current stream
1482    /// 2. Separates text blocks from tool use blocks
1483    /// 3. If there are tool blocks:
1484    ///    - Executes PreToolUse hooks (can modify/block)
1485    ///    - Executes each tool via its registered function
1486    ///    - Executes PostToolUse hooks (can modify result)
1487    ///    - Adds results to history
1488    ///    - Continues conversation with send("")
1489    /// 4. Repeats until text-only response or max iterations
1490    /// 5. Returns all final text blocks
1491    ///
1492    /// # Returns
1493    ///
1494    /// - `Ok(blocks)`: Final text blocks after all tool iterations
1495    /// - `Err(e)`: Error during execution, stream processing, or interruption
1496    ///
1497    /// # Iteration Limit
1498    ///
1499    /// The loop is bounded by `options.max_tool_iterations` to prevent infinite loops.
1500    /// When the limit is reached, the loop stops and returns whatever text blocks
1501    /// have been collected so far.
1502    ///
1503    /// # Hook Integration
1504    ///
1505    /// Hooks are executed for each tool call:
1506    /// - **PreToolUse**: Can modify input or block execution entirely
1507    /// - **PostToolUse**: Can modify the result before it's added to history
1508    ///
1509    /// If a hook blocks execution, a JSON error response is used as the tool result.
1510    ///
1511    /// # State Management
1512    ///
1513    /// The loop maintains history by adding:
1514    /// - Assistant messages with text + tool use blocks
1515    /// - User messages with tool result blocks
1516    ///
1517    /// This creates a proper conversation flow that the model can follow.
1518    ///
1519    /// # Error Recovery
1520    ///
1521    /// If a tool execution fails, the error is converted to a JSON error response
1522    /// and added as the tool result. This allows the conversation to continue
1523    /// and lets the model handle the error.
1524    async fn auto_execute_loop(&mut self) -> Result<Vec<ContentBlock>> {
1525        use crate::types::ToolResultBlock;
1526
1527        // Track iterations to prevent infinite loops
1528        let mut iteration = 0;
1529        let max_iterations = self.options.max_tool_iterations();
1530
1531        loop {
1532            // ========================================================================
1533            // STEP 1: Collect all blocks from current stream
1534            // ========================================================================
1535            // Buffer the entire response to determine if it contains tool calls
1536            let blocks = self.collect_all_blocks().await?;
1537
1538            // Empty response means stream ended or was interrupted
1539            if blocks.is_empty() {
1540                return Ok(Vec::new());
1541            }
1542
1543            // ========================================================================
1544            // STEP 2: Separate text blocks from tool use blocks
1545            // ========================================================================
1546            // The model can return a mix of text and tool calls in one response
1547            let mut text_blocks = Vec::new();
1548            let mut tool_blocks = Vec::new();
1549
1550            for block in blocks {
1551                match block {
1552                    ContentBlock::Text(_) => text_blocks.push(block),
1553                    ContentBlock::ToolUse(_) => tool_blocks.push(block),
1554                    ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {} // Ignore ToolResult and Image variants
1555                }
1556            }
1557
1558            // ========================================================================
1559            // STEP 3: Check if we're done (no tool calls)
1560            // ========================================================================
1561            // If the response contains no tool calls, we've reached the final answer
1562            if tool_blocks.is_empty() {
1563                // Add assistant's final text response to history
1564                if !text_blocks.is_empty() {
1565                    let assistant_msg = Message::assistant(text_blocks.clone());
1566                    self.history.push(assistant_msg);
1567                }
1568                // Return text blocks to caller via buffered receive()
1569                return Ok(text_blocks);
1570            }
1571
1572            // ========================================================================
1573            // STEP 4: Check iteration limit BEFORE executing tools
1574            // ========================================================================
1575            // Increment counter and check if we've hit the max
1576            iteration += 1;
1577            if iteration > max_iterations {
1578                // Max iterations reached - stop execution and return what we have
1579                // This prevents infinite tool-calling loops
1580                if !text_blocks.is_empty() {
1581                    let assistant_msg = Message::assistant(text_blocks.clone());
1582                    self.history.push(assistant_msg);
1583                }
1584                return Ok(text_blocks);
1585            }
1586
1587            // ========================================================================
1588            // STEP 5: Add assistant message to history
1589            // ========================================================================
1590            // The assistant message includes BOTH text and tool use blocks
1591            // This preserves the full context for future turns
1592            let mut all_blocks = text_blocks.clone();
1593            all_blocks.extend(tool_blocks.clone());
1594            let assistant_msg = Message::assistant(all_blocks);
1595            self.history.push(assistant_msg);
1596
1597            // ========================================================================
1598            // STEP 6: Execute all tools and collect results
1599            // ========================================================================
1600            for block in tool_blocks {
1601                if let ContentBlock::ToolUse(tool_use) = block {
1602                    // Create simplified history snapshot for hooks
1603                    // TODO: Full serialization of history for hooks
1604                    let history_snapshot: Vec<serde_json::Value> =
1605                        self.history.iter().map(|_| serde_json::json!({})).collect();
1606
1607                    // ============================================================
1608                    // Execute PreToolUse hooks
1609                    // ============================================================
1610                    use crate::hooks::PreToolUseEvent;
1611                    let pre_event = PreToolUseEvent::new(
1612                        tool_use.name().to_string(),
1613                        tool_use.input().clone(),
1614                        tool_use.id().to_string(),
1615                        history_snapshot.clone(),
1616                    );
1617
1618                    // Track whether to execute and what input to use
1619                    let mut tool_input = tool_use.input().clone();
1620                    let mut should_execute = true;
1621                    let mut block_reason = None;
1622
1623                    // Execute all PreToolUse hooks
1624                    if let Some(decision) =
1625                        self.options.hooks().execute_pre_tool_use(pre_event).await
1626                    {
1627                        if !decision.continue_execution() {
1628                            // Hook blocked execution
1629                            should_execute = false;
1630                            block_reason = decision.reason().map(|s| s.to_string());
1631                        } else if let Some(modified) = decision.modified_input() {
1632                            // Hook modified the input
1633                            tool_input = modified.clone();
1634                        }
1635                    }
1636
1637                    // ============================================================
1638                    // Execute tool (or create error result if blocked)
1639                    // ============================================================
1640                    let result = if should_execute {
1641                        // Actually execute the tool
1642                        match self
1643                            .execute_tool_internal(tool_use.name(), tool_input.clone())
1644                            .await
1645                        {
1646                            Ok(res) => res, // Success - use the result
1647                            Err(e) => {
1648                                // Tool execution failed - convert to JSON error
1649                                // This allows the conversation to continue
1650                                serde_json::json!({
1651                                    "error": e.to_string(),
1652                                    "tool": tool_use.name(),
1653                                    "id": tool_use.id()
1654                                })
1655                            }
1656                        }
1657                    } else {
1658                        // Tool blocked by PreToolUse hook - create error result
1659                        serde_json::json!({
1660                            "error": "Tool execution blocked by hook",
1661                            "reason": block_reason.unwrap_or_else(|| "No reason provided".to_string()),
1662                            "tool": tool_use.name(),
1663                            "id": tool_use.id()
1664                        })
1665                    };
1666
1667                    // ============================================================
1668                    // Execute PostToolUse hooks
1669                    // ============================================================
1670                    use crate::hooks::PostToolUseEvent;
1671                    let post_event = PostToolUseEvent::new(
1672                        tool_use.name().to_string(),
1673                        tool_input,
1674                        tool_use.id().to_string(),
1675                        result.clone(),
1676                        history_snapshot,
1677                    );
1678
1679                    let mut final_result = result;
1680                    if let Some(decision) =
1681                        self.options.hooks().execute_post_tool_use(post_event).await
1682                    {
1683                        // PostToolUse can modify the result
1684                        // Note: Uses modified_input field (naming is historical)
1685                        if let Some(modified) = decision.modified_input() {
1686                            final_result = modified.clone();
1687                        }
1688                    }
1689
1690                    // ============================================================
1691                    // Add tool result to history
1692                    // ============================================================
1693                    // Tool results are added as user messages (per OpenAI convention)
1694                    let tool_result = ToolResultBlock::new(tool_use.id(), final_result);
1695                    let tool_result_msg =
1696                        Message::user_with_blocks(vec![ContentBlock::ToolResult(tool_result)]);
1697                    self.history.push(tool_result_msg);
1698                }
1699            }
1700
1701            // ========================================================================
1702            // STEP 7: Continue conversation to get next response
1703            // ========================================================================
1704            // Send empty string to continue - the history contains all context
1705            self.send("").await?;
1706
1707            // Loop continues to collect and process the next response
1708            // This will either be more tool calls or the final text answer
1709        }
1710    }
1711
1712    /// Receives the next content block from the current stream.
1713    ///
1714    /// This is the primary method for consuming responses from the model. It works
1715    /// differently depending on the operating mode:
1716    ///
1717    /// ## Manual Mode (default)
1718    ///
1719    /// Streams blocks directly from the API response as they arrive. You receive:
1720    /// - `TextBlock`: Incremental text from the model
1721    /// - `ToolUseBlock`: Requests to execute tools
1722    /// - Other block types as they're emitted
1723    ///
1724    /// When you receive a `ToolUseBlock`, you must:
1725    /// 1. Execute the tool yourself
1726    /// 2. Call `add_tool_result()` with the result
1727    /// 3. Call `send("")` to continue the conversation
1728    ///
1729    /// ## Automatic Mode (`auto_execute_tools = true`)
1730    ///
1731    /// Transparently executes tools and only returns final text blocks. The first
1732    /// call to `receive()` triggers the auto-execution loop which:
1733    /// 1. Collects all blocks from the stream
1734    /// 2. Executes any tool calls automatically
1735    /// 3. Continues the conversation until reaching a text-only response
1736    /// 4. Buffers the final text blocks
1737    /// 5. Returns them one at a time on subsequent `receive()` calls
1738    ///
1739    /// # Returns
1740    ///
1741    /// - `Ok(Some(block))`: Successfully received a content block
1742    /// - `Ok(None)`: Stream ended normally or was interrupted
1743    /// - `Err(e)`: An error occurred during streaming or tool execution
1744    ///
1745    /// # Behavior Details
1746    ///
1747    /// ## Interruption
1748    ///
1749    /// Checks the interrupt flag on every call. If interrupted, immediately returns
1750    /// `Ok(None)` and clears the stream. The client can be reused after interruption.
1751    ///
1752    /// ## Stream Lifecycle
1753    ///
1754    /// 1. After `send()`, stream is active
1755    /// 2. Each `receive()` call yields one block
1756    /// 3. When stream ends, returns `Ok(None)`
1757    /// 4. Subsequent calls continue returning `Ok(None)` until next `send()`
1758    ///
1759    /// ## Auto-Execution Buffer
1760    ///
1761    /// In auto mode, blocks are buffered in memory. The buffer persists until
1762    /// fully consumed (index reaches length), at which point it's cleared.
1763    ///
1764    /// # State Changes
1765    ///
1766    /// - Advances stream position
1767    /// - In auto mode: May trigger entire execution loop and modify history
1768    /// - In manual mode: Only reads from stream, no history changes
1769    /// - Increments `auto_exec_index` when returning buffered blocks
1770    ///
1771    /// # Examples
1772    ///
1773    /// ## Manual Mode - Basic
1774    ///
1775    /// ```rust,no_run
1776    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1777    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1778    /// # let mut client = Client::new(AgentOptions::default())?;
1779    /// client.send("Hello!").await?;
1780    ///
1781    /// while let Some(block) = client.receive().await? {
1782    ///     match block {
1783    ///         ContentBlock::Text(text) => print!("{}", text.text),
1784    ///         ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
1785    ///     }
1786    /// }
1787    /// # Ok(())
1788    /// # }
1789    /// ```
1790    ///
1791    /// ## Manual Mode - With Tools
1792    ///
1793    /// ```rust,no_run
1794    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1795    /// # use serde_json::json;
1796    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1797    /// # let mut client = Client::new(AgentOptions::default())?;
1798    /// client.send("Use the calculator").await?;
1799    ///
1800    /// while let Some(block) = client.receive().await? {
1801    ///     match block {
1802    ///         ContentBlock::Text(text) => {
1803    ///             println!("{}", text.text);
1804    ///         }
1805    ///         ContentBlock::ToolUse(tool_use) => {
1806    ///             println!("Executing: {}", tool_use.name());
1807    ///
1808    ///             // Execute tool manually
1809    ///             let result = json!({"result": 42});
1810    ///
1811    ///             // Add result and continue
1812    ///             client.add_tool_result(tool_use.id(), result)?;
1813    ///             client.send("").await?;
1814    ///         }
1815    ///         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
1816    ///     }
1817    /// }
1818    /// # Ok(())
1819    /// # }
1820    /// ```
1821    ///
1822    /// ## Auto Mode
1823    ///
1824    /// ```rust,no_run
1825    /// # use open_agent::{Client, AgentOptions, ContentBlock, Tool};
1826    /// # use serde_json::json;
1827    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1828    /// let mut client = Client::new(AgentOptions::builder()
1829    ///     .auto_execute_tools(true)
1830    ///     .build()?)?;
1831    ///
1832    /// client.send("Calculate 2+2").await?;
1833    ///
1834    /// // Tools execute automatically - you only get final text
1835    /// while let Some(block) = client.receive().await? {
1836    ///     if let ContentBlock::Text(text) = block {
1837    ///         println!("{}", text.text);
1838    ///     }
1839    /// }
1840    /// # Ok(())
1841    /// # }
1842    /// ```
1843    ///
1844    /// ## With Error Handling
1845    ///
1846    /// ```rust,no_run
1847    /// # use open_agent::{Client, AgentOptions};
1848    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1849    /// # let mut client = Client::new(AgentOptions::default())?;
1850    /// client.send("Hello").await?;
1851    ///
1852    /// loop {
1853    ///     match client.receive().await {
1854    ///         Ok(Some(block)) => {
1855    ///             // Process block
1856    ///         }
1857    ///         Ok(None) => {
1858    ///             // Stream ended
1859    ///             break;
1860    ///         }
1861    ///         Err(e) => {
1862    ///             eprintln!("Error: {}", e);
1863    ///             break;
1864    ///         }
1865    ///     }
1866    /// }
1867    /// # Ok(())
1868    /// # }
1869    /// ```
1870    ///
1871    /// Sends a pre-built message to the AI model.
1872    ///
1873    /// This method allows sending messages with images or custom content blocks
1874    /// that cannot be expressed as simple text prompts. Use the `Message` helper
1875    /// methods like [`user_with_image()`](Message::user_with_image),
1876    /// [`user_with_image_detail()`](Message::user_with_image_detail), or
1877    /// [`user_with_base64_image()`](Message::user_with_base64_image) to create
1878    /// messages with multimodal content.
1879    ///
1880    /// Unlike [`send()`](Client::send), this method:
1881    /// - Accepts pre-built `Message` objects instead of text prompts
1882    /// - Bypasses `UserPromptSubmit` hooks (since message is already constructed)
1883    /// - Enables multimodal interactions (text + images)
1884    ///
1885    /// After calling this method, use [`receive()`](Client::receive) to get the
1886    /// response content blocks.
1887    ///
1888    /// # Arguments
1889    ///
1890    /// * `message` - A pre-built message (typically created with `Message::user_with_image()` or similar helpers)
1891    ///
1892    /// # Errors
1893    ///
1894    /// Returns `Error` if:
1895    /// - Network request fails
1896    /// - Server returns an error
1897    /// - Response cannot be parsed
1898    /// - Request is interrupted via [`interrupt()`](Client::interrupt)
1899    ///
1900    /// # Example
1901    ///
1902    /// ```rust,no_run
1903    /// use open_agent::{Client, AgentOptions, Message, ImageDetail};
1904    ///
1905    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1906    /// let options = AgentOptions::builder()
1907    ///     .model("gpt-4-vision-preview")
1908    ///     .base_url("http://localhost:1234/v1")
1909    ///     .build()?;
1910    ///
1911    /// let mut client = Client::new(options)?;
1912    ///
1913    /// // Send a message with an image
1914    /// let msg = Message::user_with_image(
1915    ///     "What's in this image?",
1916    ///     "https://example.com/photo.jpg"
1917    /// )?;
1918    /// client.send_message(msg).await?;
1919    ///
1920    /// // Receive the response
1921    /// while let Some(block) = client.receive().await? {
1922    ///     // Process response blocks
1923    /// }
1924    /// # Ok(())
1925    /// # }
1926    /// ```
1927    pub async fn send_message(&mut self, message: Message) -> Result<()> {
1928        // Reset interrupt flag for new query
1929        // This allows the client to be reused after a previous interruption
1930        // Uses SeqCst ordering to ensure visibility across all threads
1931        self.interrupted.store(false, Ordering::SeqCst);
1932
1933        // Note: We do NOT run UserPromptSubmit hooks here because:
1934        // 1. The message is already fully constructed
1935        // 2. Hooks expect string prompts, not complex Message objects
1936        // 3. For multimodal messages, there's no single "prompt" to modify
1937
1938        // Add message to history BEFORE sending request
1939        // This ensures history consistency even if request fails
1940        self.history.push(message);
1941
1942        // The rest of the logic is identical to send() - build and execute request
1943        // Build messages array for API request
1944        // This includes system prompt + full conversation history
1945        let mut messages = Vec::new();
1946
1947        // Add system prompt as first message if configured
1948        // System prompts are added fresh for each request (not from history)
1949        if !self.options.system_prompt().is_empty() {
1950            messages.push(OpenAIMessage {
1951                role: "system".to_string(),
1952                content: Some(OpenAIContent::Text(
1953                    self.options.system_prompt().to_string(),
1954                )),
1955                tool_calls: None,
1956                tool_call_id: None,
1957            });
1958        }
1959
1960        // Convert conversation history to OpenAI message format
1961        // This includes user prompts, assistant responses, and tool results
1962        for msg in &self.history {
1963            // Separate blocks by type to determine message structure
1964            let mut text_blocks = Vec::new();
1965            let mut image_blocks = Vec::new();
1966            let mut tool_use_blocks = Vec::new();
1967            let mut tool_result_blocks = Vec::new();
1968
1969            for block in &msg.content {
1970                match block {
1971                    ContentBlock::Text(text) => text_blocks.push(text),
1972                    ContentBlock::Image(image) => image_blocks.push(image),
1973                    ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
1974                    ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
1975                }
1976            }
1977
1978            // Handle different message types based on content blocks
1979            // Case 1: Message contains tool results (should be separate tool messages)
1980            if !tool_result_blocks.is_empty() {
1981                for tool_result in tool_result_blocks {
1982                    // Serialize the tool result content as JSON string
1983                    let content =
1984                        serde_json::to_string(tool_result.content()).unwrap_or_else(|e| {
1985                            format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
1986                        });
1987
1988                    messages.push(OpenAIMessage {
1989                        role: "tool".to_string(),
1990                        content: Some(OpenAIContent::Text(content)),
1991                        tool_calls: None,
1992                        tool_call_id: Some(tool_result.tool_use_id().to_string()),
1993                    });
1994                }
1995            }
1996            // Case 2: Message contains tool use blocks (assistant with tool calls)
1997            else if !tool_use_blocks.is_empty() {
1998                // Build tool_calls array
1999                let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
2000                    .iter()
2001                    .map(|tool_use| {
2002                        // Serialize the input as a JSON string (OpenAI API requirement)
2003                        let arguments = serde_json::to_string(tool_use.input())
2004                            .unwrap_or_else(|_| "{}".to_string());
2005
2006                        OpenAIToolCall {
2007                            id: tool_use.id().to_string(),
2008                            call_type: "function".to_string(),
2009                            function: OpenAIFunction {
2010                                name: tool_use.name().to_string(),
2011                                arguments,
2012                            },
2013                        }
2014                    })
2015                    .collect();
2016
2017                // Extract any text content (some models include reasoning before tool calls)
2018                // Note: OpenAI API requires content field even if empty when tool_calls present
2019                let content = if !text_blocks.is_empty() {
2020                    let text = text_blocks
2021                        .iter()
2022                        .map(|t| t.text.as_str())
2023                        .collect::<Vec<_>>()
2024                        .join("\n");
2025                    Some(OpenAIContent::Text(text))
2026                } else {
2027                    // Empty string satisfies OpenAI API schema (content is required)
2028                    Some(OpenAIContent::Text(String::new()))
2029                };
2030
2031                messages.push(OpenAIMessage {
2032                    role: "assistant".to_string(),
2033                    content,
2034                    tool_calls: Some(tool_calls),
2035                    tool_call_id: None,
2036                });
2037            }
2038            // Case 3: Message contains images (use OpenAIContent::Parts)
2039            else if !image_blocks.is_empty() {
2040                // Log debug info about images being serialized
2041                log::debug!(
2042                    "Serializing message with {} image(s) for {:?} role",
2043                    image_blocks.len(),
2044                    msg.role
2045                );
2046
2047                // Build content parts array preserving original order
2048                let mut content_parts = Vec::new();
2049
2050                // Re-iterate through content blocks to maintain order
2051                for block in &msg.content {
2052                    match block {
2053                        ContentBlock::Text(text) => {
2054                            content_parts.push(OpenAIContentPart::text(&text.text));
2055                        }
2056                        ContentBlock::Image(image) => {
2057                            // Log image details (truncate URL for privacy)
2058                            let url_display = if image.url().len() > 100 {
2059                                format!("{}... ({} chars)", &image.url()[..100], image.url().len())
2060                            } else {
2061                                image.url().to_string()
2062                            };
2063                            let detail_str = match image.detail() {
2064                                crate::types::ImageDetail::Low => "low",
2065                                crate::types::ImageDetail::High => "high",
2066                                crate::types::ImageDetail::Auto => "auto",
2067                            };
2068                            log::debug!("  - Image: {} (detail: {})", url_display, detail_str);
2069
2070                            content_parts.push(OpenAIContentPart::from_image(image));
2071                        }
2072                        ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
2073                    }
2074                }
2075
2076                // Defensive check: content_parts should never be empty at this point
2077                // If it is, it indicates a logic error (e.g., all blocks were filtered out)
2078                if content_parts.is_empty() {
2079                    return Err(Error::other(
2080                        "Internal error: Message with images produced empty content array",
2081                    ));
2082                }
2083
2084                let role_str = match msg.role {
2085                    MessageRole::System => "system",
2086                    MessageRole::User => "user",
2087                    MessageRole::Assistant => "assistant",
2088                    MessageRole::Tool => "tool",
2089                };
2090
2091                messages.push(OpenAIMessage {
2092                    role: role_str.to_string(),
2093                    content: Some(OpenAIContent::Parts(content_parts)),
2094                    tool_calls: None,
2095                    tool_call_id: None,
2096                });
2097            }
2098            // Case 4: Message contains only text (normal message, backward compatible)
2099            else {
2100                let content = text_blocks
2101                    .iter()
2102                    .map(|t| t.text.as_str())
2103                    .collect::<Vec<_>>()
2104                    .join("\n");
2105
2106                let role_str = match msg.role {
2107                    MessageRole::System => "system",
2108                    MessageRole::User => "user",
2109                    MessageRole::Assistant => "assistant",
2110                    MessageRole::Tool => "tool",
2111                };
2112
2113                messages.push(OpenAIMessage {
2114                    role: role_str.to_string(),
2115                    content: Some(OpenAIContent::Text(content)),
2116                    tool_calls: None,
2117                    tool_call_id: None,
2118                });
2119            }
2120        }
2121
2122        // Convert tools to OpenAI format if any are registered
2123        let tools = if !self.options.tools().is_empty() {
2124            Some(
2125                self.options
2126                    .tools()
2127                    .iter()
2128                    .map(|t| t.to_openai_format())
2129                    .collect(),
2130            )
2131        } else {
2132            None
2133        };
2134
2135        // Build the OpenAI-compatible request payload
2136        let request = OpenAIRequest {
2137            model: self.options.model().to_string(),
2138            messages,
2139            stream: true,
2140            max_tokens: self.options.max_tokens(),
2141            temperature: Some(self.options.temperature()),
2142            tools,
2143        };
2144
2145        // Make HTTP POST request to chat completions endpoint
2146        let url = format!("{}/chat/completions", self.options.base_url());
2147        let response = self
2148            .http_client
2149            .post(&url)
2150            .header(
2151                "Authorization",
2152                format!("Bearer {}", self.options.api_key()),
2153            )
2154            .header("Content-Type", "application/json")
2155            .json(&request)
2156            .send()
2157            .await
2158            .map_err(Error::Http)?;
2159
2160        // Check for HTTP-level errors
2161        if !response.status().is_success() {
2162            let status = response.status();
2163            let body = response.text().await.unwrap_or_else(|e| {
2164                eprintln!("WARNING: Failed to read error response body: {}", e);
2165                "Unknown error (failed to read response body)".to_string()
2166            });
2167            return Err(Error::api(format!("API error {}: {}", status, body)));
2168        }
2169
2170        // Parse Server-Sent Events stream
2171        let sse_stream = parse_sse_stream(response);
2172
2173        // Aggregate SSE chunks into complete content blocks
2174        let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
2175            let result = match chunk_result {
2176                Ok(chunk) => match aggregator.process_chunk(chunk) {
2177                    Ok(blocks) => {
2178                        if blocks.is_empty() {
2179                            Some(None) // Partial chunk
2180                        } else {
2181                            Some(Some(Ok(blocks))) // Complete blocks
2182                        }
2183                    }
2184                    Err(e) => Some(Some(Err(e))),
2185                },
2186                Err(e) => Some(Some(Err(e))),
2187            };
2188            futures::future::ready(result)
2189        });
2190
2191        // Flatten nested Options and filter out None values
2192        let stream = stream.filter_map(futures::future::ready);
2193
2194        // Flatten Vec<ContentBlock> into individual blocks
2195        let stream = stream.flat_map(|result| {
2196            futures::stream::iter(match result {
2197                Ok(blocks) => blocks.into_iter().map(Ok).collect(),
2198                Err(e) => vec![Err(e)],
2199            })
2200        });
2201
2202        // Store the content stream for receive() to consume
2203        self.current_stream = Some(Box::pin(stream));
2204
2205        Ok(())
2206    }
2207
2208    pub async fn receive(&mut self) -> Result<Option<ContentBlock>> {
2209        // ========================================================================
2210        // AUTO-EXECUTION MODE
2211        // ========================================================================
2212        if self.options.auto_execute_tools() {
2213            // Check if we have buffered blocks to return
2214            // In auto mode, all final text blocks are buffered and returned one at a time
2215            if self.auto_exec_index < self.auto_exec_buffer.len() {
2216                // Return next buffered block
2217                let block = self.auto_exec_buffer[self.auto_exec_index].clone();
2218                self.auto_exec_index += 1;
2219                return Ok(Some(block));
2220            }
2221
2222            // No buffered blocks - need to run auto-execution loop
2223            // This only happens on the first receive() call after send()
2224            if self.auto_exec_buffer.is_empty() {
2225                match self.auto_execute_loop().await {
2226                    Ok(blocks) => {
2227                        // Buffer all final text blocks
2228                        self.auto_exec_buffer = blocks;
2229                        self.auto_exec_index = 0;
2230
2231                        // If no blocks, return None (empty response)
2232                        if self.auto_exec_buffer.is_empty() {
2233                            return Ok(None);
2234                        }
2235
2236                        // Return first buffered block
2237                        let block = self.auto_exec_buffer[0].clone();
2238                        self.auto_exec_index = 1;
2239                        return Ok(Some(block));
2240                    }
2241                    Err(e) => return Err(e),
2242                }
2243            }
2244
2245            // Buffer exhausted - return None
2246            Ok(None)
2247        } else {
2248            // ====================================================================
2249            // MANUAL MODE
2250            // ====================================================================
2251            // Stream blocks directly from API without buffering or auto-execution
2252            self.receive_one().await
2253        }
2254    }
2255
2256    /// Interrupts the current operation by setting the interrupt flag.
2257    ///
2258    /// This method provides a thread-safe way to cancel any in-progress streaming
2259    /// operation. The interrupt flag is checked by `receive()` before each block,
2260    /// allowing responsive cancellation.
2261    ///
2262    /// # Behavior
2263    ///
2264    /// - Sets the atomic interrupt flag to `true`
2265    /// - Next `receive()` call will return `Ok(None)` and clear the stream
2266    /// - Flag is automatically reset to `false` on next `send()` call
2267    /// - Safe to call from any thread (uses atomic operations)
2268    /// - Idempotent: calling multiple times has same effect as calling once
2269    /// - No-op if no operation is in progress
2270    ///
2271    /// # Thread Safety
2272    ///
2273    /// This method uses `Arc<AtomicBool>` internally, which can be safely shared
2274    /// across threads. You can clone the interrupt handle and use it from different
2275    /// threads or async tasks:
2276    ///
2277    /// ```rust,no_run
2278    /// # use open_agent::{Client, AgentOptions};
2279    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2280    /// let mut client = Client::new(AgentOptions::default())?;
2281    /// let interrupt_handle = client.interrupt_handle();
2282    ///
2283    /// // Use from another thread
2284    /// tokio::spawn(async move {
2285    ///     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2286    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2287    /// });
2288    /// # Ok(())
2289    /// # }
2290    /// ```
2291    ///
2292    /// # State Changes
2293    ///
2294    /// - Sets `interrupted` flag to `true`
2295    /// - Does NOT modify stream, history, or other state directly
2296    /// - Effect takes place on next `receive()` call
2297    ///
2298    /// # Use Cases
2299    ///
2300    /// - User cancellation (e.g., stop button in UI)
2301    /// - Timeout enforcement
2302    /// - Resource cleanup
2303    /// - Emergency shutdown
2304    ///
2305    /// # Examples
2306    ///
2307    /// ## Basic Interruption
2308    ///
2309    /// ```rust,no_run
2310    /// use open_agent::{Client, AgentOptions};
2311    ///
2312    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2313    /// let mut client = Client::new(AgentOptions::default())?;
2314    ///
2315    /// client.send("Tell me a long story").await?;
2316    ///
2317    /// // Interrupt after receiving some blocks
2318    /// let mut count = 0;
2319    /// while let Some(block) = client.receive().await? {
2320    ///     count += 1;
2321    ///     if count >= 5 {
2322    ///         client.interrupt();
2323    ///     }
2324    /// }
2325    ///
2326    /// // Client is ready for new queries
2327    /// client.send("What's 2+2?").await?;
2328    /// # Ok(())
2329    /// # }
2330    /// ```
2331    ///
2332    /// ## With Timeout
2333    ///
2334    /// ```rust,no_run
2335    /// use open_agent::{Client, AgentOptions};
2336    /// use std::time::Duration;
2337    ///
2338    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2339    /// let mut client = Client::new(AgentOptions::default())?;
2340    ///
2341    /// client.send("Long request").await?;
2342    ///
2343    /// // Spawn timeout task
2344    /// let interrupt_handle = client.interrupt_handle();
2345    /// tokio::spawn(async move {
2346    ///     tokio::time::sleep(Duration::from_secs(10)).await;
2347    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2348    /// });
2349    ///
2350    /// while let Some(_block) = client.receive().await? {
2351    ///     // Process until timeout
2352    /// }
2353    /// # Ok(())
2354    /// # }
2355    /// ```
2356    pub fn interrupt(&self) {
2357        // Set interrupt flag using SeqCst for immediate visibility across all threads
2358        self.interrupted.store(true, Ordering::SeqCst);
2359    }
2360
2361    /// Returns a clone of the interrupt handle for thread-safe cancellation.
2362    ///
2363    /// This method provides access to the shared `Arc<AtomicBool>` interrupt flag,
2364    /// allowing it to be used from other threads or async tasks to signal cancellation.
2365    ///
2366    /// # Returns
2367    ///
2368    /// A cloned `Arc<AtomicBool>` that can be used to interrupt operations from any thread.
2369    ///
2370    /// # Examples
2371    ///
2372    /// ```rust,no_run
2373    /// # use open_agent::{Client, AgentOptions};
2374    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2375    /// let mut client = Client::new(AgentOptions::default())?;
2376    /// let interrupt_handle = client.interrupt_handle();
2377    ///
2378    /// // Use from another thread
2379    /// tokio::spawn(async move {
2380    ///     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2381    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2382    /// });
2383    /// # Ok(())
2384    /// # }
2385    /// ```
2386    pub fn interrupt_handle(&self) -> Arc<AtomicBool> {
2387        self.interrupted.clone()
2388    }
2389
2390    /// Returns a reference to the conversation history.
2391    ///
2392    /// The history contains all messages exchanged in the conversation, including:
2393    /// - User messages
2394    /// - Assistant messages (with text and tool use blocks)
2395    /// - Tool result messages
2396    ///
2397    /// # Returns
2398    ///
2399    /// A slice of `Message` objects in chronological order.
2400    ///
2401    /// # Use Cases
2402    ///
2403    /// - Inspecting conversation context
2404    /// - Debugging tool execution flow
2405    /// - Saving conversation state
2406    /// - Implementing custom history management
2407    ///
2408    /// # Examples
2409    ///
2410    /// ```rust
2411    /// # use open_agent::{Client, AgentOptions};
2412    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2413    /// let client = Client::new(AgentOptions::default())?;
2414    ///
2415    /// // Initially empty
2416    /// assert_eq!(client.history().len(), 0);
2417    /// # Ok(())
2418    /// # }
2419    /// ```
2420    pub fn history(&self) -> &[Message] {
2421        &self.history
2422    }
2423
2424    /// Returns a mutable reference to the conversation history.
2425    ///
2426    /// This allows you to modify the history directly for advanced use cases like:
2427    /// - Removing old messages to manage context length
2428    /// - Editing messages for retry scenarios
2429    /// - Injecting synthetic messages for testing
2430    ///
2431    /// # Warning
2432    ///
2433    /// Modifying history directly can lead to inconsistent conversation state if not
2434    /// done carefully. The SDK expects history to follow the proper message flow
2435    /// (user → assistant → tool results → assistant, etc.).
2436    ///
2437    /// # Examples
2438    ///
2439    /// ```rust
2440    /// # use open_agent::{Client, AgentOptions};
2441    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2442    /// let mut client = Client::new(AgentOptions::default())?;
2443    ///
2444    /// // Remove oldest messages to stay within context limit
2445    /// if client.history().len() > 50 {
2446    ///     client.history_mut().drain(0..10);
2447    /// }
2448    /// # Ok(())
2449    /// # }
2450    /// ```
2451    pub fn history_mut(&mut self) -> &mut Vec<Message> {
2452        &mut self.history
2453    }
2454
2455    /// Returns a reference to the agent configuration options.
2456    ///
2457    /// Provides read-only access to the `AgentOptions` used to configure this client.
2458    ///
2459    /// # Use Cases
2460    ///
2461    /// - Inspecting current configuration
2462    /// - Debugging issues
2463    /// - Conditional logic based on settings
2464    ///
2465    /// # Examples
2466    ///
2467    /// ```rust
2468    /// # use open_agent::{Client, AgentOptions};
2469    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2470    /// let client = Client::new(AgentOptions::builder()
2471    ///     .model("gpt-4")
2472    ///     .base_url("http://localhost:1234/v1")
2473    ///     .build()?)?;
2474    ///
2475    /// println!("Using model: {}", client.options().model());
2476    /// # Ok(())
2477    /// # }
2478    /// ```
2479    pub fn options(&self) -> &AgentOptions {
2480        &self.options
2481    }
2482
2483    /// Clears all conversation history.
2484    ///
2485    /// This resets the conversation to a blank slate while preserving the client
2486    /// configuration (tools, hooks, model, etc.). The next message will start a
2487    /// fresh conversation with no prior context.
2488    ///
2489    /// # State Changes
2490    ///
2491    /// - Clears `history` vector
2492    /// - Does NOT modify current stream, options, or other state
2493    ///
2494    /// # Use Cases
2495    ///
2496    /// - Starting a new conversation
2497    /// - Preventing context length issues
2498    /// - Clearing sensitive data
2499    /// - Implementing conversation sessions
2500    ///
2501    /// # Examples
2502    ///
2503    /// ```rust,no_run
2504    /// # use open_agent::{Client, AgentOptions, ContentBlock};
2505    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2506    /// let mut client = Client::new(AgentOptions::default())?;
2507    ///
2508    /// // First conversation
2509    /// client.send("Hello").await?;
2510    /// while let Some(_) = client.receive().await? {}
2511    ///
2512    /// // Clear and start fresh
2513    /// client.clear_history();
2514    ///
2515    /// // New conversation with no memory of previous
2516    /// client.send("Hello again").await?;
2517    /// # Ok(())
2518    /// # }
2519    /// ```
2520    pub fn clear_history(&mut self) {
2521        self.history.clear();
2522    }
2523
2524    /// Adds a tool result to the conversation history for manual tool execution.
2525    ///
2526    /// This method is used exclusively in **manual mode** after receiving a `ToolUseBlock`.
2527    /// The workflow is:
2528    ///
2529    /// 1. `receive()` returns a `ToolUseBlock`
2530    /// 2. You execute the tool yourself
2531    /// 3. Call `add_tool_result()` with the tool's output
2532    /// 4. Call `send("")` to continue the conversation
2533    /// 5. The model receives the tool result and generates a response
2534    ///
2535    /// # Parameters
2536    ///
2537    /// - `tool_use_id`: The unique ID from the `ToolUseBlock` (must match exactly)
2538    /// - `content`: The tool's output as a JSON value
2539    ///
2540    /// # Behavior
2541    ///
2542    /// Creates a `ToolResultBlock` and adds it to conversation history as a tool message.
2543    /// This preserves the tool call/result pairing that the model needs to understand
2544    /// the conversation flow.
2545    ///
2546    /// # State Changes
2547    ///
2548    /// - Appends a tool message to `history`
2549    /// - Does NOT modify stream or trigger any requests
2550    ///
2551    /// # Important Notes
2552    ///
2553    /// - **Not used in auto mode**: Auto-execution handles tool results automatically
2554    /// - **ID must match**: The `tool_use_id` must match the ID from the `ToolUseBlock`
2555    /// - **No validation**: This method doesn't validate the result format
2556    /// - **Must call send()**: After adding result(s), call `send("")` to continue
2557    ///
2558    /// # Examples
2559    ///
2560    /// ## Basic Manual Tool Execution
2561    ///
2562    /// ```rust,no_run
2563    /// use open_agent::{Client, AgentOptions, ContentBlock};
2564    /// use serde_json::json;
2565    ///
2566    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2567    /// let mut client = Client::new(AgentOptions::default())?;
2568    /// client.send("Use the calculator").await?;
2569    ///
2570    /// while let Some(block) = client.receive().await? {
2571    ///     match block {
2572    ///         ContentBlock::ToolUse(tool_use) => {
2573    ///             // Execute tool manually
2574    ///             let result = json!({"result": 42});
2575    ///
2576    ///             // Add result to history
2577    ///             client.add_tool_result(tool_use.id(), result)?;
2578    ///
2579    ///             // Continue conversation to get model's response
2580    ///             client.send("").await?;
2581    ///         }
2582    ///         ContentBlock::Text(text) => {
2583    ///             println!("{}", text.text);
2584    ///         }
2585    ///         ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
2586    ///     }
2587    /// }
2588    /// # Ok(())
2589    /// # }
2590    /// ```
2591    ///
2592    /// ## Handling Tool Errors
2593    ///
2594    /// ```rust,no_run
2595    /// use open_agent::{Client, AgentOptions, ContentBlock};
2596    /// use serde_json::json;
2597    ///
2598    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2599    /// # let mut client = Client::new(AgentOptions::default())?;
2600    /// # client.send("test").await?;
2601    /// while let Some(block) = client.receive().await? {
2602    ///     if let ContentBlock::ToolUse(tool_use) = block {
2603    ///         // Try to execute tool
2604    ///         let result = match execute_tool(tool_use.name(), tool_use.input()) {
2605    ///             Ok(output) => output,
2606    ///             Err(e) => json!({
2607    ///                 "error": e.to_string(),
2608    ///                 "tool": tool_use.name()
2609    ///             })
2610    ///         };
2611    ///
2612    ///         client.add_tool_result(tool_use.id(), result)?;
2613    ///         client.send("").await?;
2614    ///     }
2615    /// }
2616    ///
2617    /// # fn execute_tool(name: &str, input: &serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
2618    /// #     Ok(json!({}))
2619    /// # }
2620    /// # Ok(())
2621    /// # }
2622    /// ```
2623    ///
2624    /// ## Multiple Tool Calls
2625    ///
2626    /// ```rust,no_run
2627    /// use open_agent::{Client, AgentOptions, ContentBlock};
2628    /// use serde_json::json;
2629    ///
2630    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2631    /// # let mut client = Client::new(AgentOptions::default())?;
2632    /// client.send("Calculate 2+2 and 3+3").await?;
2633    ///
2634    /// let mut tool_calls = Vec::new();
2635    ///
2636    /// // Collect all tool calls
2637    /// while let Some(block) = client.receive().await? {
2638    ///     if let ContentBlock::ToolUse(tool_use) = block {
2639    ///         tool_calls.push(tool_use);
2640    ///     }
2641    /// }
2642    ///
2643    /// // Execute and add results for all tools
2644    /// for tool_call in tool_calls {
2645    ///     let result = json!({"result": 42}); // Execute tool
2646    ///     client.add_tool_result(tool_call.id(), result)?;
2647    /// }
2648    ///
2649    /// // Continue conversation
2650    /// client.send("").await?;
2651    /// # Ok(())
2652    /// # }
2653    /// ```
2654    pub fn add_tool_result(&mut self, tool_use_id: &str, content: serde_json::Value) -> Result<()> {
2655        use crate::types::ToolResultBlock;
2656
2657        // Create a tool result block with the given ID and content
2658        let result_block = ToolResultBlock::new(tool_use_id, content);
2659
2660        // Add to history as a tool message
2661        // Note: ToolResultBlock is properly serialized in build_api_request()
2662        // as a separate message with role="tool" and tool_call_id set
2663        let serialized = serde_json::to_string(result_block.content())
2664            .map_err(|e| Error::config(format!("Failed to serialize tool result: {}", e)))?;
2665
2666        self.history.push(Message::new(
2667            MessageRole::Tool,
2668            vec![ContentBlock::Text(TextBlock::new(serialized))],
2669        ));
2670
2671        Ok(())
2672    }
2673
2674    /// Looks up a registered tool by name.
2675    ///
2676    /// This method provides access to the tool registry for manual execution scenarios.
2677    /// It searches the tools registered in `AgentOptions` and returns a reference to
2678    /// the matching tool if found.
2679    ///
2680    /// # Parameters
2681    ///
2682    /// - `name`: The tool name to search for (case-sensitive)
2683    ///
2684    /// # Returns
2685    ///
2686    /// - `Some(&Tool)`: Tool found
2687    /// - `None`: No tool with that name
2688    ///
2689    /// # Use Cases
2690    ///
2691    /// - Manual tool execution in response to `ToolUseBlock`
2692    /// - Validating tool availability before offering features
2693    /// - Inspecting tool metadata (name, description, schema)
2694    ///
2695    /// # Examples
2696    ///
2697    /// ## Execute Tool Manually
2698    ///
2699    /// ```rust,no_run
2700    /// use open_agent::{Client, AgentOptions, ContentBlock};
2701    ///
2702    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2703    /// # let mut client = Client::new(AgentOptions::default())?;
2704    /// # client.send("test").await?;
2705    /// while let Some(block) = client.receive().await? {
2706    ///     if let ContentBlock::ToolUse(tool_use) = block {
2707    ///         if let Some(tool) = client.get_tool(tool_use.name()) {
2708    ///             // Execute the tool
2709    ///             let result = tool.execute(tool_use.input().clone()).await?;
2710    ///             client.add_tool_result(tool_use.id(), result)?;
2711    ///             client.send("").await?;
2712    ///         } else {
2713    ///             println!("Unknown tool: {}", tool_use.name());
2714    ///         }
2715    ///     }
2716    /// }
2717    /// # Ok(())
2718    /// # }
2719    /// ```
2720    ///
2721    /// ## Check Tool Availability
2722    ///
2723    /// ```rust
2724    /// # use open_agent::{Client, AgentOptions};
2725    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2726    /// # let client = Client::new(AgentOptions::default())?;
2727    /// if client.get_tool("calculator").is_some() {
2728    ///     println!("Calculator is available");
2729    /// }
2730    /// # Ok(())
2731    /// # }
2732    /// ```
2733    pub fn get_tool(&self, name: &str) -> Option<&crate::tools::Tool> {
2734        // Search registered tools by name
2735        self.options
2736            .tools()
2737            .iter()
2738            .find(|t| t.name() == name)
2739            .map(|t| t.as_ref())
2740    }
2741}
2742
2743#[cfg(test)]
2744mod tests {
2745    use super::*;
2746
2747    #[test]
2748    fn test_client_creation() {
2749        let options = AgentOptions::builder()
2750            .system_prompt("Test")
2751            .model("test-model")
2752            .base_url("http://localhost:1234/v1")
2753            .build()
2754            .unwrap();
2755
2756        let client = Client::new(options).expect("Should create client successfully");
2757        assert_eq!(client.history().len(), 0);
2758    }
2759
2760    #[test]
2761    fn test_client_new_returns_result() {
2762        // Test that Client::new() returns Result instead of panicking
2763        let options = AgentOptions::builder()
2764            .system_prompt("Test")
2765            .model("test-model")
2766            .base_url("http://localhost:1234/v1")
2767            .build()
2768            .unwrap();
2769
2770        // This should not panic - it should return Ok(client)
2771        let result = Client::new(options);
2772        assert!(result.is_ok(), "Client::new() should return Ok");
2773
2774        let client = result.unwrap();
2775        assert_eq!(client.history().len(), 0);
2776    }
2777
2778    #[test]
2779    fn test_interrupt_flag_initial_state() {
2780        let options = AgentOptions::builder()
2781            .system_prompt("Test")
2782            .model("test-model")
2783            .base_url("http://localhost:1234/v1")
2784            .build()
2785            .unwrap();
2786
2787        let client = Client::new(options).expect("Should create client successfully");
2788        // Initially not interrupted
2789        assert!(!client.interrupted.load(Ordering::SeqCst));
2790    }
2791
2792    #[test]
2793    fn test_interrupt_sets_flag() {
2794        let options = AgentOptions::builder()
2795            .system_prompt("Test")
2796            .model("test-model")
2797            .base_url("http://localhost:1234/v1")
2798            .build()
2799            .unwrap();
2800
2801        let client = Client::new(options).expect("Should create client successfully");
2802        client.interrupt();
2803        assert!(client.interrupted.load(Ordering::SeqCst));
2804    }
2805
2806    #[test]
2807    fn test_interrupt_idempotent() {
2808        let options = AgentOptions::builder()
2809            .system_prompt("Test")
2810            .model("test-model")
2811            .base_url("http://localhost:1234/v1")
2812            .build()
2813            .unwrap();
2814
2815        let client = Client::new(options).expect("Should create client successfully");
2816        client.interrupt();
2817        assert!(client.interrupted.load(Ordering::SeqCst));
2818
2819        // Call again - should still be interrupted
2820        client.interrupt();
2821        assert!(client.interrupted.load(Ordering::SeqCst));
2822    }
2823
2824    #[tokio::test]
2825    async fn test_receive_returns_none_when_interrupted() {
2826        let options = AgentOptions::builder()
2827            .system_prompt("Test")
2828            .model("test-model")
2829            .base_url("http://localhost:1234/v1")
2830            .build()
2831            .unwrap();
2832
2833        let mut client = Client::new(options).expect("Should create client successfully");
2834
2835        // Interrupt before receiving
2836        client.interrupt();
2837
2838        // NEW SIGNATURE: receive() should return Ok(None) when interrupted
2839        let result = client.receive().await;
2840        assert!(result.is_ok());
2841        assert!(result.unwrap().is_none());
2842    }
2843
2844    #[tokio::test]
2845    async fn test_receive_returns_ok_none_when_no_stream() {
2846        let options = AgentOptions::builder()
2847            .system_prompt("Test")
2848            .model("test-model")
2849            .base_url("http://localhost:1234/v1")
2850            .build()
2851            .unwrap();
2852
2853        let mut client = Client::new(options).expect("Should create client successfully");
2854
2855        // No stream started - receive() should return Ok(None)
2856        let result = client.receive().await;
2857        assert!(result.is_ok());
2858        assert!(result.unwrap().is_none());
2859    }
2860
2861    #[tokio::test]
2862    async fn test_receive_error_propagation() {
2863        // This test demonstrates that errors are wrapped in Err(), not Some(Err())
2864        // We'll verify this behavior when we have a mock stream that produces errors
2865        let options = AgentOptions::builder()
2866            .system_prompt("Test")
2867            .model("test-model")
2868            .base_url("http://localhost:1234/v1")
2869            .build()
2870            .unwrap();
2871
2872        let client = Client::new(options).expect("Should create client successfully");
2873
2874        // Signature check: receive() returns Result<Option<ContentBlock>>
2875        // This means we can use ? operator cleanly:
2876        // while let Some(block) = client.receive().await? { ... }
2877
2878        // Type assertion to ensure signature is correct
2879        let _: Result<Option<ContentBlock>> = std::future::ready(Ok(None)).await;
2880        drop(client);
2881    }
2882
2883    #[test]
2884    fn test_empty_content_parts_protection() {
2885        // Test for Issue #3 - Verify empty content_parts causes appropriate handling
2886        // This documents expected behavior: messages with images should have content
2887
2888        use crate::types::{ContentBlock, ImageBlock, Message, MessageRole};
2889
2890        // GIVEN: Message with an image
2891        let img = ImageBlock::from_url("https://example.com/test.jpg").expect("Valid URL");
2892
2893        let msg = Message::new(MessageRole::User, vec![ContentBlock::Image(img)]);
2894
2895        // WHEN: Building content_parts
2896        let mut content_parts = Vec::new();
2897        for block in &msg.content {
2898            match block {
2899                ContentBlock::Text(text) => {
2900                    content_parts.push(crate::types::OpenAIContentPart::text(&text.text));
2901                }
2902                ContentBlock::Image(image) => {
2903                    content_parts.push(crate::types::OpenAIContentPart::from_image(image));
2904                }
2905                ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
2906            }
2907        }
2908
2909        // THEN: content_parts should not be empty
2910        assert!(
2911            !content_parts.is_empty(),
2912            "Messages with images should produce non-empty content_parts"
2913        );
2914    }
2915}