open_agent/
client.rs

1//! Client for streaming queries and multi-turn conversations
2//!
3//! This module provides the core streaming client implementation for the Open Agent SDK.
4//! It handles communication with OpenAI-compatible APIs, manages conversation history,
5//! and provides two modes of operation: manual and automatic tool execution.
6//!
7//! # Architecture Overview
8//!
9//! The SDK implements a **streaming-first architecture** where all responses from the model
10//! are received as a stream of content blocks. This design enables:
11//!
12//! - **Progressive rendering**: Display text as it arrives without waiting for completion
13//! - **Real-time tool execution**: Execute tools as they're requested by the model
14//! - **Interruption support**: Cancel operations mid-stream without corrupting state
15//! - **Memory efficiency**: Process large responses without buffering everything in memory
16//!
17//! ## Two Operating Modes
18//!
19//! ### 1. Manual Tool Execution Mode (default)
20//!
21//! In manual mode, the client streams content blocks directly to the caller. When the model
22//! requests a tool, the caller receives a `ToolUseBlock`, executes the tool, adds the result
23//! using `add_tool_result()`, and continues the conversation with another `send()` call.
24//!
25//! **Use cases**: Custom tool execution logic, interactive debugging, fine-grained control
26//!
27//! ### 2. Automatic Tool Execution Mode
28//!
29//! When `auto_execute_tools` is enabled, the client automatically executes tools and continues
30//! the conversation until receiving a text-only response. The caller only receives the final
31//! text blocks after all tool iterations complete.
32//!
33//! **Use cases**: Simple agentic workflows, automated task completion, batch processing
34//!
35//! ## Request Flow
36//!
37//! ```text
38//! User sends prompt
39//!     │
40//!     ├─> UserPromptSubmit hook executes (can modify/block prompt)
41//!     │
42//!     ├─> Prompt added to history
43//!     │
44//!     ├─> HTTP request to OpenAI-compatible API
45//!     │
46//!     ├─> Response streamed as Server-Sent Events (SSE)
47//!     │
48//!     ├─> SSE chunks aggregated into ContentBlocks
49//!     │
50//!     └─> Blocks emitted to caller (or buffered for auto-execution)
51//! ```
52//!
53//! ## Tool Execution Flow
54//!
55//! ### Manual Mode:
56//! ```text
57//! receive() → ToolUseBlock
58//!     │
59//!     ├─> Caller executes tool
60//!     │
61//!     ├─> Caller calls add_tool_result()
62//!     │
63//!     ├─> Caller calls send("") to continue
64//!     │
65//!     └─> receive() → TextBlock (model's response)
66//! ```
67//!
68//! ### Auto Mode:
69//! ```text
70//! receive() triggers auto-execution loop
71//!     │
72//!     ├─> Collect all blocks from stream
73//!     │
74//!     ├─> For each ToolUseBlock:
75//!     │   ├─> PreToolUse hook executes (can modify/block)
76//!     │   ├─> Tool executed via Tool.execute()
77//!     │   ├─> PostToolUse hook executes (can modify result)
78//!     │   └─> Result added to history
79//!     │
80//!     ├─> Continue conversation with send("")
81//!     │
82//!     ├─> Repeat until text-only response or max iterations
83//!     │
84//!     └─> Return text blocks one-by-one via receive()
85//! ```
86//!
87//! ## State Management
88//!
89//! The client maintains several pieces of state:
90//!
91//! - **history**: Full conversation history (`Vec<Message>`)
92//! - **current_stream**: Active SSE stream being consumed (`Option<ContentStream>`)
93//! - **interrupted**: Atomic flag for cancellation (`Arc<AtomicBool>`)
94//! - **auto_exec_buffer**: Buffered blocks for auto-execution mode (`Vec<ContentBlock>`)
95//! - **auto_exec_index**: Current position in buffer (usize)
96//!
97//! ## Interruption Mechanism
98//!
99//! The interrupt system uses `Arc<AtomicBool>` to enable safe, thread-safe cancellation:
100//!
101//! ```rust,no_run
102//! # use open_agent::{Client, AgentOptions};
103//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
104//! let mut client = Client::new(AgentOptions::default())?;
105//! let handle = client.interrupt_handle(); // Clone Arc for use in other threads
106//!
107//! // In another thread or async task:
108//! tokio::spawn(async move {
109//!     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
110//!     handle.store(true, std::sync::atomic::Ordering::SeqCst);
111//! });
112//!
113//! client.send("Long request").await?;
114//! while let Some(block) = client.receive().await? {
115//!     // Will stop when interrupted
116//! }
117//! # Ok(())
118//! # }
119//! ```
120//!
121//! ## Hook Integration
122//!
123//! Hooks provide extension points throughout the request lifecycle:
124//!
125//! - **UserPromptSubmit**: Called before sending user prompt (can modify or block)
126//! - **PreToolUse**: Called before executing each tool (can modify input or block execution)
127//! - **PostToolUse**: Called after tool execution (can modify result)
128//!
129//! Hooks are only invoked in specific scenarios and have access to conversation history.
130//!
131//! ## Error Handling
132//!
133//! Errors are propagated immediately and leave the client in a valid state:
134//!
135//! - **HTTP errors**: Network failures, timeouts, connection issues
136//! - **API errors**: Invalid model, authentication failures, rate limits
137//! - **Parse errors**: Malformed SSE responses, invalid JSON
138//! - **Tool errors**: Tool execution failures (converted to JSON error responses)
139//! - **Hook errors**: Hook execution failures or blocked operations
140//!
141//! After an error, the client remains usable for new requests.
142//!
143//! # Examples
144//!
145//! ## Simple Single-Turn Query
146//!
147//! ```rust,no_run
148//! use open_agent::{query, AgentOptions};
149//! use futures::StreamExt;
150//!
151//! #[tokio::main]
152//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
153//!     let options = AgentOptions::builder()
154//!         .model("gpt-4")
155//!         .api_key("sk-...")
156//!         .build()?;
157//!
158//!     let mut stream = query("What is Rust?", &options).await?;
159//!
160//!     while let Some(block) = stream.next().await {
161//!         if let open_agent::ContentBlock::Text(text) = block? {
162//!             print!("{}", text.text);
163//!         }
164//!     }
165//!
166//!     Ok(())
167//! }
168//! ```
169//!
170//! ## Multi-Turn Conversation
171//!
172//! ```rust,no_run
173//! use open_agent::{Client, AgentOptions, ContentBlock};
174//! use futures::StreamExt;
175//!
176//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
177//! let mut client = Client::new(AgentOptions::builder()
178//!     .model("gpt-4")
179//!     .api_key("sk-...")
180//!     .build()?)?;
181//!
182//! // First question
183//! client.send("What's the capital of France?").await?;
184//! while let Some(block) = client.receive().await? {
185//!     if let ContentBlock::Text(text) = block {
186//!         println!("{}", text.text);
187//!     }
188//! }
189//!
190//! // Follow-up question (history is maintained)
191//! client.send("What's its population?").await?;
192//! while let Some(block) = client.receive().await? {
193//!     if let ContentBlock::Text(text) = block {
194//!         println!("{}", text.text);
195//!     }
196//! }
197//! # Ok(())
198//! # }
199//! ```
200//!
201//! ## Manual Tool Execution
202//!
203//! ```rust,no_run
204//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
205//! use serde_json::json;
206//!
207//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208//! let calculator = Tool::new(
209//!     "calculator",
210//!     "Performs arithmetic operations",
211//!     json!({"type": "object", "properties": {"operation": {"type": "string"}}}),
212//!     |input| Box::pin(async move {
213//!         // Custom execution logic
214//!         Ok(json!({"result": 42}))
215//!     })
216//! );
217//!
218//! let mut client = Client::new(AgentOptions::builder()
219//!     .model("gpt-4")
220//!     .api_key("sk-...")
221//!     .tools(vec![calculator])
222//!     .build()?)?;
223//!
224//! client.send("Calculate 2+2").await?;
225//!
226//! while let Some(block) = client.receive().await? {
227//!     match block {
228//!         ContentBlock::ToolUse(tool_use) => {
229//!             println!("Model wants to use: {}", tool_use.name);
230//!
231//!             // Execute tool manually
232//!             let tool = client.get_tool(&tool_use.name).unwrap();
233//!             let result = tool.execute(tool_use.input).await?;
234//!
235//!             // Add result and continue
236//!             client.add_tool_result(&tool_use.id, result)?;
237//!             client.send("").await?;
238//!         }
239//!         ContentBlock::Text(text) => {
240//!             println!("Response: {}", text.text);
241//!         }
242//!         _ => {}
243//!     }
244//! }
245//! # Ok(())
246//! # }
247//! ```
248//!
249//! ## Automatic Tool Execution
250//!
251//! ```rust,no_run
252//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
253//! use serde_json::json;
254//!
255//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256//! let calculator = Tool::new(
257//!     "calculator",
258//!     "Performs arithmetic operations",
259//!     json!({"type": "object"}),
260//!     |input| Box::pin(async move { Ok(json!({"result": 42})) })
261//! );
262//!
263//! let mut client = Client::new(AgentOptions::builder()
264//!     .model("gpt-4")
265//!     .api_key("sk-...")
266//!     .tools(vec![calculator])
267//!     .auto_execute_tools(true)  // Enable auto-execution
268//!     .max_tool_iterations(5)    // Max 5 tool rounds
269//!     .build()?)?;
270//!
271//! client.send("Calculate 2+2 and then multiply by 3").await?;
272//!
273//! // Tools are executed automatically - you only get final text response
274//! while let Some(block) = client.receive().await? {
275//!     if let ContentBlock::Text(text) = block {
276//!         println!("{}", text.text);
277//!     }
278//! }
279//! # Ok(())
280//! # }
281//! ```
282//!
283//! ## With Hooks
284//!
285//! ```ignore
286//! use open_agent::{Client, AgentOptions, Hooks, HookDecision};
287//!
288//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
289//! let hooks = Hooks::new()
290//!     .add_user_prompt_submit(|event| async move {
291//!         // Block prompts containing certain words
292//!         if event.prompt.contains("forbidden") {
293//!             return Some(HookDecision::block("Forbidden word detected"));
294//!         }
295//!         Some(HookDecision::continue_())
296//!     })
297//!     .add_pre_tool_use(|event| async move {
298//!         // Log all tool uses
299//!         println!("Executing tool: {}", event.tool_name);
300//!         Some(HookDecision::continue_())
301//!     });
302//!
303//! let mut client = Client::new(AgentOptions::builder()
304//!     .model("gpt-4")
305//!     .base_url("http://localhost:1234/v1")
306//!     .hooks(hooks)
307//!     .build()?)?;
308//!
309//! // Hooks will be executed automatically
310//! client.send("Hello!").await?;
311//! # Ok(())
312//! # }
313//! ```
314
315use crate::types::{
316    AgentOptions, ContentBlock, Message, MessageRole, OpenAIFunction, OpenAIMessage,
317    OpenAIRequest, OpenAIToolCall, TextBlock,
318};
319use crate::utils::{ToolCallAggregator, parse_sse_stream};
320use crate::{Error, Result};
321use futures::stream::{Stream, StreamExt};
322use std::pin::Pin;
323use std::sync::Arc;
324use std::sync::atomic::{AtomicBool, Ordering};
325use std::time::Duration;
326
327/// A pinned, boxed stream of content blocks from the model.
328///
329/// This type alias represents an asynchronous stream that yields `ContentBlock` items.
330/// Each item is wrapped in a `Result` to handle potential errors during streaming.
331///
332/// The stream is:
333/// - **Pinned** (`Pin<Box<...>>`): Required for safe async operations and self-referential types
334/// - **Boxed**: Allows dynamic dispatch and hides the concrete stream implementation
335/// - **Send**: Can be safely transferred between threads
336///
337/// # Content Blocks
338///
339/// The stream can yield several types of content blocks:
340///
341/// - **TextBlock**: Incremental text responses from the model
342/// - **ToolUseBlock**: Requests to execute a tool with specific parameters
343/// - **ToolResultBlock**: Results from tool execution (in manual mode)
344///
345/// # Error Handling
346///
347/// Errors in the stream indicate issues like:
348/// - Network failures or timeouts
349/// - Malformed SSE events
350/// - JSON parsing errors
351/// - API errors from the model provider
352///
353/// When an error occurs, the stream typically terminates. It's the caller's responsibility
354/// to handle errors appropriately.
355///
356/// # Examples
357///
358/// ```rust,no_run
359/// use open_agent::{query, AgentOptions, ContentBlock};
360/// use futures::StreamExt;
361///
362/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
363/// let options = AgentOptions::builder()
364///     .model("gpt-4")
365///     .api_key("sk-...")
366///     .build()?;
367///
368/// let mut stream = query("Hello!", &options).await?;
369///
370/// while let Some(result) = stream.next().await {
371///     match result {
372///         Ok(ContentBlock::Text(text)) => print!("{}", text.text),
373///         Ok(_) => {}, // Other block types
374///         Err(e) => eprintln!("Stream error: {}", e),
375///     }
376/// }
377/// # Ok(())
378/// # }
379/// ```
380pub type ContentStream = Pin<Box<dyn Stream<Item = Result<ContentBlock>> + Send>>;
381
382/// Simple query function for single-turn interactions without conversation history.
383///
384/// This is a stateless convenience function for simple queries that don't require
385/// multi-turn conversations. It creates a temporary HTTP client, sends a single
386/// prompt, and returns a stream of content blocks.
387///
388/// For multi-turn conversations or more control over the interaction, use [`Client`] instead.
389///
390/// # Parameters
391///
392/// - `prompt`: The user's message to send to the model
393/// - `options`: Configuration including model, API key, tools, etc.
394///
395/// # Returns
396///
397/// Returns a `ContentStream` that yields content blocks as they arrive from the model.
398/// The stream must be polled to completion to receive all blocks.
399///
400/// # Behavior
401///
402/// 1. Creates a temporary HTTP client with configured timeout
403/// 2. Builds message array (system prompt + user prompt)
404/// 3. Converts tools to OpenAI format if provided
405/// 4. Makes HTTP POST request to `/chat/completions`
406/// 5. Parses Server-Sent Events (SSE) response stream
407/// 6. Aggregates chunks into complete content blocks
408/// 7. Returns stream that yields blocks as they complete
409///
410/// # Error Handling
411///
412/// This function can return errors for:
413/// - HTTP client creation failures
414/// - Network errors during the request
415/// - API errors (authentication, invalid model, rate limits, etc.)
416/// - SSE parsing errors
417/// - JSON deserialization errors
418///
419/// # Performance Notes
420///
421/// - Creates a new HTTP client for each call (consider using `Client` for repeated queries)
422/// - Timeout is configurable via `AgentOptions::timeout` (default: 120 seconds)
423/// - Streaming begins immediately; no buffering of the full response
424///
425/// # Examples
426///
427/// ## Basic Usage
428///
429/// ```rust,no_run
430/// use open_agent::{query, AgentOptions};
431/// use futures::StreamExt;
432///
433/// #[tokio::main]
434/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
435///     let options = AgentOptions::builder()
436///         .system_prompt("You are a helpful assistant")
437///         .model("gpt-4")
438///         .api_key("sk-...")
439///         .build()?;
440///
441///     let mut stream = query("What's the capital of France?", &options).await?;
442///
443///     while let Some(block) = stream.next().await {
444///         match block? {
445///             open_agent::ContentBlock::Text(text) => {
446///                 print!("{}", text.text);
447///             }
448///             _ => {}
449///         }
450///     }
451///
452///     Ok(())
453/// }
454/// ```
455///
456/// ## With Tools
457///
458/// ```rust,no_run
459/// use open_agent::{query, AgentOptions, Tool, ContentBlock};
460/// use futures::StreamExt;
461/// use serde_json::json;
462///
463/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
464/// let calculator = Tool::new(
465///     "calculator",
466///     "Performs calculations",
467///     json!({"type": "object"}),
468///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
469/// );
470///
471/// let options = AgentOptions::builder()
472///     .model("gpt-4")
473///     .api_key("sk-...")
474///     .tools(vec![calculator])
475///     .build()?;
476///
477/// let mut stream = query("Calculate 2+2", &options).await?;
478///
479/// while let Some(block) = stream.next().await {
480///     match block? {
481///         ContentBlock::ToolUse(tool_use) => {
482///             println!("Model wants to use: {}", tool_use.name);
483///             // Note: You'll need to manually execute tools and continue
484///             // the conversation. For automatic execution, use Client.
485///         }
486///         ContentBlock::Text(text) => print!("{}", text.text),
487///         _ => {}
488///     }
489/// }
490/// # Ok(())
491/// # }
492/// ```
493///
494/// ## Error Handling
495///
496/// ```rust,no_run
497/// use open_agent::{query, AgentOptions};
498/// use futures::StreamExt;
499///
500/// # async fn example() {
501/// let options = AgentOptions::builder()
502///     .model("gpt-4")
503///     .api_key("invalid-key")
504///     .build()
505///     .unwrap();
506///
507/// match query("Hello", &options).await {
508///     Ok(mut stream) => {
509///         while let Some(result) = stream.next().await {
510///             match result {
511///                 Ok(block) => println!("Block: {:?}", block),
512///                 Err(e) => {
513///                     eprintln!("Stream error: {}", e);
514///                     break;
515///                 }
516///             }
517///         }
518///     }
519///     Err(e) => eprintln!("Query failed: {}", e),
520/// }
521/// # }
522/// ```
523pub async fn query(prompt: &str, options: &AgentOptions) -> Result<ContentStream> {
524    // Create HTTP client with configured timeout
525    // The timeout applies to the entire request, not individual chunks
526    let client = reqwest::Client::builder()
527        .timeout(Duration::from_secs(options.timeout()))
528        .build()
529        .map_err(Error::Http)?;
530
531    // Build messages array for the API request
532    // OpenAI format expects an array of message objects with role and content
533    let mut messages = Vec::new();
534
535    // Add system prompt if provided
536    // System prompts set the assistant's behavior and context
537    if !options.system_prompt().is_empty() {
538        messages.push(OpenAIMessage {
539            role: "system".to_string(),
540            content: options.system_prompt().to_string(),
541            tool_calls: None,
542            tool_call_id: None,
543        });
544    }
545
546    // Add user prompt
547    // This is the actual query from the user
548    messages.push(OpenAIMessage {
549        role: "user".to_string(),
550        content: prompt.to_string(),
551        tool_calls: None,
552        tool_call_id: None,
553    });
554
555    // Convert tools to OpenAI format if any are provided
556    // Tools are described using JSON Schema for parameter validation
557    let tools = if !options.tools().is_empty() {
558        Some(
559            options
560                .tools()
561                .iter()
562                .map(|t| t.to_openai_format())
563                .collect(),
564        )
565    } else {
566        None
567    };
568
569    // Build the OpenAI-compatible request payload
570    // stream=true enables Server-Sent Events for incremental responses
571    let request = OpenAIRequest {
572        model: options.model().to_string(),
573        messages,
574        stream: true, // Critical: enables SSE streaming
575        max_tokens: options.max_tokens(),
576        temperature: Some(options.temperature()),
577        tools,
578    };
579
580    // Make HTTP POST request to the chat completions endpoint
581    let url = format!("{}/chat/completions", options.base_url());
582    let response = client
583        .post(&url)
584        .header("Authorization", format!("Bearer {}", options.api_key()))
585        .header("Content-Type", "application/json")
586        .json(&request)
587        .send()
588        .await
589        .map_err(Error::Http)?;
590
591    // Check for HTTP-level errors before processing the stream
592    // This catches authentication failures, rate limits, invalid models, etc.
593    if !response.status().is_success() {
594        let status = response.status();
595        let body = response.text().await.unwrap_or_else(|e| {
596            eprintln!("WARNING: Failed to read error response body: {}", e);
597            "Unknown error (failed to read response body)".to_string()
598        });
599        return Err(Error::api(format!("API error {}: {}", status, body)));
600    }
601
602    // Parse the Server-Sent Events (SSE) stream
603    // The response body is a stream of "data: {...}" events
604    let sse_stream = parse_sse_stream(response);
605
606    // Aggregate SSE chunks into complete content blocks
607    // ToolCallAggregator handles partial JSON and assembles complete tool calls
608    // The scan() combinator maintains state across stream items
609    let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
610        let result = match chunk_result {
611            Ok(chunk) => match aggregator.process_chunk(chunk) {
612                Ok(blocks) => {
613                    if blocks.is_empty() {
614                        Some(None) // Intermediate chunk, continue streaming
615                    } else {
616                        Some(Some(Ok(blocks))) // Complete block(s) ready
617                    }
618                }
619                Err(e) => Some(Some(Err(e))), // Propagate processing error
620            },
621            Err(e) => Some(Some(Err(e))), // Propagate stream error
622        };
623        futures::future::ready(result)
624    });
625
626    // Flatten the stream to emit individual blocks
627    // filter_map removes None values (incomplete chunks)
628    // flat_map expands Vec<ContentBlock> into individual items
629    let flattened = stream
630        .filter_map(|item| async move { item })
631        .flat_map(|result| {
632            futures::stream::iter(match result {
633                Ok(blocks) => blocks.into_iter().map(Ok).collect(),
634                Err(e) => vec![Err(e)],
635            })
636        });
637
638    // Pin and box the stream for type erasure and safe async usage
639    Ok(Box::pin(flattened))
640}
641
642/// Stateful client for multi-turn conversations with automatic history management.
643///
644/// The `Client` is the primary interface for building conversational AI applications.
645/// It maintains conversation history, manages streaming responses, and provides two
646/// modes of operation: manual and automatic tool execution.
647///
648/// # State Management
649///
650/// The client maintains several pieces of state that persist across multiple turns:
651///
652/// - **Conversation History**: Complete record of all messages exchanged
653/// - **Active Stream**: Currently active SSE stream being consumed
654/// - **Interrupt Flag**: Thread-safe cancellation signal
655/// - **Auto-Execution Buffer**: Cached blocks for auto-execution mode
656///
657/// # Operating Modes
658///
659/// ## Manual Mode (default)
660///
661/// In manual mode, the client streams blocks directly to the caller. When the model
662/// requests a tool, you receive a `ToolUseBlock`, execute the tool yourself, add the
663/// result with `add_tool_result()`, and continue the conversation.
664///
665/// **Advantages**:
666/// - Full control over tool execution
667/// - Custom error handling per tool
668/// - Ability to modify tool inputs/outputs
669/// - Interactive debugging capabilities
670///
671/// ## Automatic Mode (`auto_execute_tools = true`)
672///
673/// In automatic mode, the client executes tools transparently and only returns the
674/// final text response after all tool iterations complete.
675///
676/// **Advantages**:
677/// - Simpler API for common use cases
678/// - Built-in retry logic via hooks
679/// - Automatic conversation continuation
680/// - Configurable iteration limits
681///
682/// # Thread Safety
683///
684/// The client is NOT thread-safe for concurrent use. However, the interrupt mechanism
685/// uses `Arc<AtomicBool>` which can be safely shared across threads to signal cancellation.
686///
687/// # Memory Management
688///
689/// - History grows unbounded by default (consider clearing periodically)
690/// - Streams are consumed lazily (low memory footprint during streaming)
691/// - Auto-execution buffers entire response (higher memory in auto mode)
692///
693/// # Examples
694///
695/// ## Basic Multi-Turn Conversation
696///
697/// ```rust,no_run
698/// use open_agent::{Client, AgentOptions, ContentBlock};
699///
700/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
701/// let mut client = Client::new(AgentOptions::builder()
702///     .model("gpt-4")
703///     .api_key("sk-...")
704///     .build()?)?;
705///
706/// // First question
707/// client.send("What's the capital of France?").await?;
708/// while let Some(block) = client.receive().await? {
709///     if let ContentBlock::Text(text) = block {
710///         println!("{}", text.text); // "Paris is the capital of France."
711///     }
712/// }
713///
714/// // Follow-up question - history is automatically maintained
715/// client.send("What's its population?").await?;
716/// while let Some(block) = client.receive().await? {
717///     if let ContentBlock::Text(text) = block {
718///         println!("{}", text.text); // "Paris has approximately 2.2 million people."
719///     }
720/// }
721/// # Ok(())
722/// # }
723/// ```
724///
725/// ## Manual Tool Execution
726///
727/// ```rust,no_run
728/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
729/// use serde_json::json;
730///
731/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
732/// let calculator = Tool::new(
733///     "calculator",
734///     "Performs arithmetic",
735///     json!({"type": "object"}),
736///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
737/// );
738///
739/// let mut client = Client::new(AgentOptions::builder()
740///     .model("gpt-4")
741///     .api_key("sk-...")
742///     .tools(vec![calculator])
743///     .build()?)?;
744///
745/// client.send("What's 2+2?").await?;
746///
747/// while let Some(block) = client.receive().await? {
748///     match block {
749///         ContentBlock::ToolUse(tool_use) => {
750///             // Execute tool manually
751///             let result = json!({"result": 4});
752///             client.add_tool_result(&tool_use.id, result)?;
753///
754///             // Continue conversation to get model's response
755///             client.send("").await?;
756///         }
757///         ContentBlock::Text(text) => {
758///             println!("{}", text.text); // "The result is 4."
759///         }
760///         _ => {}
761///     }
762/// }
763/// # Ok(())
764/// # }
765/// ```
766///
767/// ## Automatic Tool Execution
768///
769/// ```rust,no_run
770/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
771/// use serde_json::json;
772///
773/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
774/// let calculator = Tool::new(
775///     "calculator",
776///     "Performs arithmetic",
777///     json!({"type": "object"}),
778///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
779/// );
780///
781/// let mut client = Client::new(AgentOptions::builder()
782///     .model("gpt-4")
783///     .api_key("sk-...")
784///     .tools(vec![calculator])
785///     .auto_execute_tools(true)  // Enable auto-execution
786///     .build()?)?;
787///
788/// client.send("What's 2+2?").await?;
789///
790/// // Tools execute automatically - you only receive final text
791/// while let Some(block) = client.receive().await? {
792///     if let ContentBlock::Text(text) = block {
793///         println!("{}", text.text); // "The result is 4."
794///     }
795/// }
796/// # Ok(())
797/// # }
798/// ```
799///
800/// ## With Interruption
801///
802/// ```rust,no_run
803/// use open_agent::{Client, AgentOptions};
804/// use std::time::Duration;
805///
806/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
807/// let mut client = Client::new(AgentOptions::default())?;
808///
809/// // Start a long-running query
810/// client.send("Write a very long story").await?;
811///
812/// // Spawn a task to interrupt after timeout
813/// let interrupt_handle = client.interrupt_handle();
814/// tokio::spawn(async move {
815///     tokio::time::sleep(Duration::from_secs(5)).await;
816///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
817/// });
818///
819/// // This loop will stop when interrupted
820/// while let Some(block) = client.receive().await? {
821///     // Process blocks...
822/// }
823///
824/// // Client is still usable after interruption
825/// client.send("What's 2+2?").await?;
826/// # Ok(())
827/// # }
828/// ```
829pub struct Client {
830    /// Configuration options including model, API key, tools, hooks, etc.
831    ///
832    /// This field contains all the settings that control how the client behaves.
833    /// It's set once during construction and cannot be modified (though you can
834    /// access it via `options()` for inspection).
835    options: AgentOptions,
836
837    /// Complete conversation history as a sequence of messages.
838    ///
839    /// Each message contains a role (System/User/Assistant/Tool) and content blocks.
840    /// History grows unbounded by default - use `clear_history()` to reset.
841    ///
842    /// **Important**: The history includes ALL messages, not just user/assistant.
843    /// This includes tool results and intermediate assistant messages from tool calls.
844    history: Vec<Message>,
845
846    /// Currently active SSE stream being consumed.
847    ///
848    /// This is `Some(stream)` while a response is being received, and `None` when
849    /// no request is in flight or after a response completes.
850    ///
851    /// The stream is set by `send()` and consumed by `receive()`. When the stream
852    /// is exhausted, `receive()` returns `Ok(None)` and sets this back to `None`.
853    current_stream: Option<ContentStream>,
854
855    /// Reusable HTTP client for making API requests.
856    ///
857    /// Configured once during construction with the timeout from `AgentOptions`.
858    /// Reusing the same client across requests enables connection pooling and
859    /// better performance for multi-turn conversations.
860    http_client: reqwest::Client,
861
862    /// Thread-safe interrupt flag for cancellation.
863    ///
864    /// This `Arc<AtomicBool>` can be cloned and shared across threads or async tasks
865    /// to signal cancellation. When set to `true`, the next `receive()` call will
866    /// return `Ok(None)` and clear the current stream.
867    ///
868    /// The flag is automatically reset to `false` at the start of each `send()` call.
869    ///
870    /// **Thread Safety**: Can be safely accessed from multiple threads using atomic
871    /// operations. However, only one thread should call `send()`/`receive()`.
872    interrupted: Arc<AtomicBool>,
873
874    /// Buffer of content blocks for auto-execution mode.
875    ///
876    /// When `auto_execute_tools` is enabled, `receive()` internally calls the
877    /// auto-execution loop which buffers all final text blocks here. Subsequent
878    /// calls to `receive()` return blocks from this buffer one at a time.
879    ///
880    /// **Only used when `options.auto_execute_tools == true`**.
881    ///
882    /// The buffer is cleared when starting a new auto-execution loop.
883    auto_exec_buffer: Vec<ContentBlock>,
884
885    /// Current read position in the auto-execution buffer.
886    ///
887    /// Tracks which block to return next when `receive()` is called in auto mode.
888    /// Reset to 0 when the buffer is refilled with a new response.
889    ///
890    /// **Only used when `options.auto_execute_tools == true`**.
891    auto_exec_index: usize,
892}
893
894impl Client {
895    /// Creates a new client with the specified configuration.
896    ///
897    /// This constructor initializes all state fields and creates a reusable HTTP client
898    /// configured with the timeout from `AgentOptions`.
899    ///
900    /// # Parameters
901    ///
902    /// - `options`: Configuration including model, API key, tools, hooks, etc.
903    ///
904    /// # Errors
905    ///
906    /// Returns an error if the HTTP client cannot be built. This can happen due to:
907    /// - Invalid TLS configuration
908    /// - System resource exhaustion
909    /// - Invalid timeout values
910    ///
911    /// # Examples
912    ///
913    /// ```rust
914    /// use open_agent::{Client, AgentOptions};
915    ///
916    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
917    /// let client = Client::new(AgentOptions::builder()
918    ///     .model("gpt-4")
919    ///     .base_url("http://localhost:1234/v1")
920    ///     .build()?)?;
921    /// # Ok(())
922    /// # }
923    /// ```
924    pub fn new(options: AgentOptions) -> Result<Self> {
925        // Build HTTP client with configured timeout
926        // This client is reused across all requests for connection pooling
927        let http_client = reqwest::Client::builder()
928            .timeout(Duration::from_secs(options.timeout()))
929            .build()
930            .map_err(|e| Error::config(format!("Failed to build HTTP client: {}", e)))?;
931
932        Ok(Self {
933            options,
934            history: Vec::new(),  // Empty conversation history
935            current_stream: None, // No active stream yet
936            http_client,
937            interrupted: Arc::new(AtomicBool::new(false)), // Not interrupted initially
938            auto_exec_buffer: Vec::new(),                  // Empty buffer for auto mode
939            auto_exec_index: 0,                            // Start at beginning of buffer
940        })
941    }
942
943    /// Sends a user message and initiates streaming of the model's response.
944    ///
945    /// This method performs several critical steps:
946    ///
947    /// 1. Executes UserPromptSubmit hooks (which can modify or block the prompt)
948    /// 2. Adds the user message to conversation history
949    /// 3. Builds and sends HTTP request to the OpenAI-compatible API
950    /// 4. Parses the SSE stream and sets up aggregation
951    /// 5. Stores the stream for consumption via `receive()`
952    ///
953    /// # Parameters
954    ///
955    /// - `prompt`: The user's message. Can be empty to continue conversation after
956    ///   adding tool results (common pattern in manual tool execution mode).
957    ///
958    /// # Returns
959    ///
960    /// - `Ok(())`: Request sent successfully, call `receive()` to get blocks
961    /// - `Err(e)`: Request failed (network error, API error, hook blocked, etc.)
962    ///
963    /// # Behavior Details
964    ///
965    /// ## Hook Execution
966    ///
967    /// Before sending, UserPromptSubmit hooks are executed. Hooks can:
968    /// - Modify the prompt text
969    /// - Block the request entirely
970    /// - Access conversation history
971    ///
972    /// If a hook blocks the request, this method returns an error immediately.
973    ///
974    /// ## History Management
975    ///
976    /// The prompt is added to history BEFORE sending the request. This ensures
977    /// that history is consistent even if the request fails.
978    ///
979    /// ## Stream Setup
980    ///
981    /// The response stream is set up but not consumed. You must call `receive()`
982    /// repeatedly to get content blocks. The stream remains active until:
983    /// - All blocks are consumed (stream naturally ends)
984    /// - An error occurs
985    /// - Interrupt is triggered
986    ///
987    /// ## Interrupt Handling
988    ///
989    /// The interrupt flag is reset to `false` at the start of this method,
990    /// allowing a fresh request after a previous interruption.
991    ///
992    /// # State Changes
993    ///
994    /// - Resets `interrupted` flag to `false`
995    /// - Appends user message to `history`
996    /// - Sets `current_stream` to new SSE stream
997    /// - Does NOT modify `auto_exec_buffer` or `auto_exec_index`
998    ///
999    /// # Errors
1000    ///
1001    /// Returns errors for:
1002    /// - Hook blocking the prompt
1003    /// - HTTP client errors (network failure, DNS, etc.)
1004    /// - API errors (auth failure, invalid model, rate limits)
1005    /// - Invalid response format
1006    ///
1007    /// After an error, the client remains usable for new requests.
1008    ///
1009    /// # Examples
1010    ///
1011    /// ## Basic Usage
1012    ///
1013    /// ```rust,no_run
1014    /// # use open_agent::{Client, AgentOptions};
1015    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1016    /// # let mut client = Client::new(AgentOptions::default())?;
1017    /// client.send("Hello!").await?;
1018    ///
1019    /// while let Some(block) = client.receive().await? {
1020    ///     // Process blocks...
1021    /// }
1022    /// # Ok(())
1023    /// # }
1024    /// ```
1025    ///
1026    /// ## Continuing After Tool Result
1027    ///
1028    /// ```rust,no_run
1029    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1030    /// # use serde_json::json;
1031    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1032    /// # let mut client = Client::new(AgentOptions::default())?;
1033    /// client.send("Use the calculator").await?;
1034    ///
1035    /// while let Some(block) = client.receive().await? {
1036    ///     if let ContentBlock::ToolUse(tool_use) = block {
1037    ///         // Execute tool and add result
1038    ///         client.add_tool_result(&tool_use.id, json!({"result": 42}))?;
1039    ///
1040    ///         // Continue conversation with empty prompt
1041    ///         client.send("").await?;
1042    ///     }
1043    /// }
1044    /// # Ok(())
1045    /// # }
1046    /// ```
1047    pub async fn send(&mut self, prompt: &str) -> Result<()> {
1048        use crate::hooks::UserPromptSubmitEvent;
1049
1050        // Reset interrupt flag for new query
1051        // This allows the client to be reused after a previous interruption
1052        // Uses SeqCst ordering to ensure visibility across all threads
1053        self.interrupted.store(false, Ordering::SeqCst);
1054
1055        // Execute UserPromptSubmit hooks
1056        // Hooks run BEFORE adding to history, allowing modification or blocking
1057        let mut final_prompt = prompt.to_string();
1058        let history_snapshot: Vec<serde_json::Value> = self
1059            .history
1060            .iter()
1061            .map(|_| serde_json::json!({})) // Simplified snapshot for hooks
1062            .collect();
1063
1064        // Create hook event with current prompt and history
1065        let event = UserPromptSubmitEvent::new(final_prompt.clone(), history_snapshot);
1066
1067        // Execute all registered UserPromptSubmit hooks
1068        if let Some(decision) = self.options.hooks().execute_user_prompt_submit(event).await {
1069            // Check if hook wants to block execution
1070            if !decision.continue_execution() {
1071                return Err(Error::other(format!(
1072                    "Prompt blocked by hook: {}",
1073                    decision.reason().unwrap_or("")
1074                )));
1075            }
1076            // Apply any prompt modifications from hooks
1077            if let Some(modified) = decision.modified_prompt() {
1078                final_prompt = modified.to_string();
1079            }
1080        }
1081
1082        // Add user message to history BEFORE sending request
1083        // This ensures history consistency even if request fails
1084        // Empty prompts are still added (needed for tool continuation)
1085        self.history.push(Message::user(final_prompt));
1086
1087        // Build messages array for API request
1088        // This includes system prompt + full conversation history
1089        let mut messages = Vec::new();
1090
1091        // Add system prompt as first message if configured
1092        // System prompts are added fresh for each request (not from history)
1093        if !self.options.system_prompt().is_empty() {
1094            messages.push(OpenAIMessage {
1095                role: "system".to_string(),
1096                content: self.options.system_prompt().to_string(),
1097                tool_calls: None,
1098                tool_call_id: None,
1099            });
1100        }
1101
1102        // Convert conversation history to OpenAI message format
1103        // This includes user prompts, assistant responses, and tool results
1104        for msg in &self.history {
1105            // Separate blocks by type to determine message structure
1106            let mut text_blocks = Vec::new();
1107            let mut tool_use_blocks = Vec::new();
1108            let mut tool_result_blocks = Vec::new();
1109
1110            for block in &msg.content {
1111                match block {
1112                    ContentBlock::Text(text) => text_blocks.push(text),
1113                    ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
1114                    ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
1115                }
1116            }
1117
1118            // Handle different message types based on content blocks
1119            // Case 1: Message contains tool results (should be separate tool messages)
1120            if !tool_result_blocks.is_empty() {
1121                for tool_result in tool_result_blocks {
1122                    // Serialize the tool result content as JSON string
1123                    let content = serde_json::to_string(&tool_result.content)
1124                        .unwrap_or_else(|e| {
1125                            format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
1126                        });
1127
1128                    messages.push(OpenAIMessage {
1129                        role: "tool".to_string(),
1130                        content,
1131                        tool_calls: None,
1132                        tool_call_id: Some(tool_result.tool_use_id.clone()),
1133                    });
1134                }
1135            }
1136            // Case 2: Message contains tool use blocks (assistant with tool calls)
1137            else if !tool_use_blocks.is_empty() {
1138                // Build tool_calls array
1139                let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
1140                    .iter()
1141                    .map(|tool_use| {
1142                        // Serialize the input as a JSON string (OpenAI API requirement)
1143                        let arguments = serde_json::to_string(&tool_use.input)
1144                            .unwrap_or_else(|_| "{}".to_string());
1145
1146                        OpenAIToolCall {
1147                            id: tool_use.id.clone(),
1148                            call_type: "function".to_string(),
1149                            function: OpenAIFunction {
1150                                name: tool_use.name.clone(),
1151                                arguments,
1152                            },
1153                        }
1154                    })
1155                    .collect();
1156
1157                // Extract any text content (some models include reasoning before tool calls)
1158                let content = text_blocks
1159                    .iter()
1160                    .map(|t| t.text.as_str())
1161                    .collect::<Vec<_>>()
1162                    .join("\n");
1163
1164                messages.push(OpenAIMessage {
1165                    role: "assistant".to_string(),
1166                    content,
1167                    tool_calls: Some(tool_calls),
1168                    tool_call_id: None,
1169                });
1170            }
1171            // Case 3: Message contains only text (normal message)
1172            else {
1173                let content = text_blocks
1174                    .iter()
1175                    .map(|t| t.text.as_str())
1176                    .collect::<Vec<_>>()
1177                    .join("\n");
1178
1179                let role_str = match msg.role {
1180                    MessageRole::System => "system",
1181                    MessageRole::User => "user",
1182                    MessageRole::Assistant => "assistant",
1183                    MessageRole::Tool => "tool",
1184                };
1185
1186                messages.push(OpenAIMessage {
1187                    role: role_str.to_string(),
1188                    content,
1189                    tool_calls: None,
1190                    tool_call_id: None,
1191                });
1192            }
1193        }
1194
1195        // Convert tools to OpenAI format if any are registered
1196        // Each tool is described with name, description, and JSON Schema parameters
1197        let tools = if !self.options.tools().is_empty() {
1198            Some(
1199                self.options
1200                    .tools()
1201                    .iter()
1202                    .map(|t| t.to_openai_format())
1203                    .collect(),
1204            )
1205        } else {
1206            None
1207        };
1208
1209        // Build the OpenAI-compatible request payload
1210        let request = OpenAIRequest {
1211            model: self.options.model().to_string(),
1212            messages,
1213            stream: true, // Always stream for progressive rendering
1214            max_tokens: self.options.max_tokens(),
1215            temperature: Some(self.options.temperature()),
1216            tools,
1217        };
1218
1219        // Make HTTP POST request to chat completions endpoint
1220        let url = format!("{}/chat/completions", self.options.base_url());
1221        let response = self
1222            .http_client
1223            .post(&url)
1224            .header(
1225                "Authorization",
1226                format!("Bearer {}", self.options.api_key()),
1227            )
1228            .header("Content-Type", "application/json")
1229            .json(&request)
1230            .send()
1231            .await
1232            .map_err(Error::Http)?;
1233
1234        // Check for HTTP-level errors before processing stream
1235        // This catches authentication, rate limits, invalid models, etc.
1236        if !response.status().is_success() {
1237            let status = response.status();
1238            let body = response.text().await.unwrap_or_else(|e| {
1239                eprintln!("WARNING: Failed to read error response body: {}", e);
1240                "Unknown error (failed to read response body)".to_string()
1241            });
1242            return Err(Error::api(format!("API error {}: {}", status, body)));
1243        }
1244
1245        // Parse Server-Sent Events (SSE) stream from response
1246        let sse_stream = parse_sse_stream(response);
1247
1248        // Aggregate SSE chunks into complete content blocks
1249        // ToolCallAggregator maintains state to handle incremental JSON chunks
1250        // that may arrive split across multiple SSE events
1251        let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
1252            let result = match chunk_result {
1253                Ok(chunk) => match aggregator.process_chunk(chunk) {
1254                    Ok(blocks) => {
1255                        if blocks.is_empty() {
1256                            Some(None) // Partial chunk, keep aggregating
1257                        } else {
1258                            Some(Some(Ok(blocks))) // Complete block(s) ready
1259                        }
1260                    }
1261                    Err(e) => Some(Some(Err(e))), // Processing error
1262                },
1263                Err(e) => Some(Some(Err(e))), // Stream error
1264            };
1265            futures::future::ready(result)
1266        });
1267
1268        // Flatten the stream to emit individual blocks
1269        // filter_map removes None values (partial chunks)
1270        // flat_map converts Vec<ContentBlock> to individual items
1271        let flattened = stream
1272            .filter_map(|item| async move { item })
1273            .flat_map(|result| {
1274                futures::stream::iter(match result {
1275                    Ok(blocks) => blocks.into_iter().map(Ok).collect(),
1276                    Err(e) => vec![Err(e)],
1277                })
1278            });
1279
1280        // Store the stream for consumption via receive()
1281        // The stream is NOT consumed here - that happens in receive()
1282        self.current_stream = Some(Box::pin(flattened));
1283
1284        Ok(())
1285    }
1286
1287    /// Internal method that returns one block from the current stream.
1288    ///
1289    /// This is the core streaming logic extracted for reuse by both manual mode
1290    /// and auto-execution mode. It handles interrupt checking and stream consumption.
1291    ///
1292    /// # Returns
1293    ///
1294    /// - `Ok(Some(block))`: Successfully received a content block
1295    /// - `Ok(None)`: Stream ended naturally or was interrupted
1296    /// - `Err(e)`: An error occurred during streaming
1297    ///
1298    /// # State Changes
1299    ///
1300    /// - Sets `current_stream` to `None` if interrupted or stream ends
1301    /// - Does not modify history or other state
1302    ///
1303    /// # Implementation Notes
1304    ///
1305    /// This method checks the interrupt flag on every call, allowing responsive
1306    /// cancellation. The check uses SeqCst ordering for immediate visibility of
1307    /// interrupts from other threads.
1308    async fn receive_one(&mut self) -> Result<Option<ContentBlock>> {
1309        // Check interrupt flag before attempting to receive
1310        // Uses SeqCst to ensure we see the latest value from any thread
1311        if self.interrupted.load(Ordering::SeqCst) {
1312            // Clear the stream and return None to signal completion
1313            self.current_stream = None;
1314            return Ok(None);
1315        }
1316
1317        // Poll the current stream if one exists
1318        if let Some(stream) = &mut self.current_stream {
1319            match stream.next().await {
1320                Some(Ok(block)) => Ok(Some(block)), // Got a block
1321                Some(Err(e)) => Err(e),             // Stream error
1322                None => Ok(None),                   // Stream ended
1323            }
1324        } else {
1325            // No active stream
1326            Ok(None)
1327        }
1328    }
1329
1330    /// Collects all blocks from the current stream into a vector.
1331    ///
1332    /// Internal helper for auto-execution mode. This method buffers the entire
1333    /// response in memory, which is necessary to determine if the response contains
1334    /// tool calls before returning anything to the caller.
1335    ///
1336    /// # Returns
1337    ///
1338    /// - `Ok(vec)`: Successfully collected all blocks
1339    /// - `Err(e)`: Error during collection or interrupted
1340    ///
1341    /// # Memory Usage
1342    ///
1343    /// This buffers the entire response, which can be large for long completions.
1344    /// Consider the memory implications when using auto-execution mode.
1345    ///
1346    /// # Interruption
1347    ///
1348    /// Checks interrupt flag during collection and returns error if interrupted.
1349    async fn collect_all_blocks(&mut self) -> Result<Vec<ContentBlock>> {
1350        let mut blocks = Vec::new();
1351
1352        // Consume entire stream into vector
1353        while let Some(block) = self.receive_one().await? {
1354            // Check interrupt during collection for responsiveness
1355            if self.interrupted.load(Ordering::SeqCst) {
1356                self.current_stream = None;
1357                return Err(Error::other(
1358                    "Operation interrupted during block collection",
1359                ));
1360            }
1361
1362            blocks.push(block);
1363        }
1364
1365        Ok(blocks)
1366    }
1367
1368    /// Executes a tool by name with the given input.
1369    ///
1370    /// Internal helper for auto-execution mode. Looks up the tool in the registered
1371    /// tools list and executes it with the provided input.
1372    ///
1373    /// # Parameters
1374    ///
1375    /// - `tool_name`: Name of the tool to execute
1376    /// - `input`: JSON value containing tool parameters
1377    ///
1378    /// # Returns
1379    ///
1380    /// - `Ok(result)`: Tool executed successfully, returns result as JSON
1381    /// - `Err(e)`: Tool not found or execution failed
1382    ///
1383    /// # Error Handling
1384    ///
1385    /// If the tool is not found in the registry, returns a ToolError.
1386    /// If execution fails, the error from the tool is propagated.
1387    async fn execute_tool_internal(
1388        &self,
1389        tool_name: &str,
1390        input: serde_json::Value,
1391    ) -> Result<serde_json::Value> {
1392        // Find tool in registered tools by name
1393        let tool = self
1394            .options
1395            .tools()
1396            .iter()
1397            .find(|t| t.name() == tool_name)
1398            .ok_or_else(|| Error::tool(format!("Tool '{}' not found", tool_name)))?;
1399
1400        // Execute the tool's async function
1401        tool.execute(input).await
1402    }
1403
1404    /// Auto-execution loop that handles tool calls automatically.
1405    ///
1406    /// This is the core implementation of automatic tool execution mode. It:
1407    ///
1408    /// 1. Collects all blocks from the current stream
1409    /// 2. Separates text blocks from tool use blocks
1410    /// 3. If there are tool blocks:
1411    ///    - Executes PreToolUse hooks (can modify/block)
1412    ///    - Executes each tool via its registered function
1413    ///    - Executes PostToolUse hooks (can modify result)
1414    ///    - Adds results to history
1415    ///    - Continues conversation with send("")
1416    /// 4. Repeats until text-only response or max iterations
1417    /// 5. Returns all final text blocks
1418    ///
1419    /// # Returns
1420    ///
1421    /// - `Ok(blocks)`: Final text blocks after all tool iterations
1422    /// - `Err(e)`: Error during execution, stream processing, or interruption
1423    ///
1424    /// # Iteration Limit
1425    ///
1426    /// The loop is bounded by `options.max_tool_iterations` to prevent infinite loops.
1427    /// When the limit is reached, the loop stops and returns whatever text blocks
1428    /// have been collected so far.
1429    ///
1430    /// # Hook Integration
1431    ///
1432    /// Hooks are executed for each tool call:
1433    /// - **PreToolUse**: Can modify input or block execution entirely
1434    /// - **PostToolUse**: Can modify the result before it's added to history
1435    ///
1436    /// If a hook blocks execution, a JSON error response is used as the tool result.
1437    ///
1438    /// # State Management
1439    ///
1440    /// The loop maintains history by adding:
1441    /// - Assistant messages with text + tool use blocks
1442    /// - User messages with tool result blocks
1443    ///
1444    /// This creates a proper conversation flow that the model can follow.
1445    ///
1446    /// # Error Recovery
1447    ///
1448    /// If a tool execution fails, the error is converted to a JSON error response
1449    /// and added as the tool result. This allows the conversation to continue
1450    /// and lets the model handle the error.
1451    async fn auto_execute_loop(&mut self) -> Result<Vec<ContentBlock>> {
1452        use crate::types::ToolResultBlock;
1453
1454        // Track iterations to prevent infinite loops
1455        let mut iteration = 0;
1456        let max_iterations = self.options.max_tool_iterations();
1457
1458        loop {
1459            // ========================================================================
1460            // STEP 1: Collect all blocks from current stream
1461            // ========================================================================
1462            // Buffer the entire response to determine if it contains tool calls
1463            let blocks = self.collect_all_blocks().await?;
1464
1465            // Empty response means stream ended or was interrupted
1466            if blocks.is_empty() {
1467                return Ok(Vec::new());
1468            }
1469
1470            // ========================================================================
1471            // STEP 2: Separate text blocks from tool use blocks
1472            // ========================================================================
1473            // The model can return a mix of text and tool calls in one response
1474            let mut text_blocks = Vec::new();
1475            let mut tool_blocks = Vec::new();
1476
1477            for block in blocks {
1478                match block {
1479                    ContentBlock::Text(_) => text_blocks.push(block),
1480                    ContentBlock::ToolUse(_) => tool_blocks.push(block),
1481                    _ => {} // Ignore ToolResult and other variants
1482                }
1483            }
1484
1485            // ========================================================================
1486            // STEP 3: Check if we're done (no tool calls)
1487            // ========================================================================
1488            // If the response contains no tool calls, we've reached the final answer
1489            if tool_blocks.is_empty() {
1490                // Add assistant's final text response to history
1491                if !text_blocks.is_empty() {
1492                    let assistant_msg = Message::assistant(text_blocks.clone());
1493                    self.history.push(assistant_msg);
1494                }
1495                // Return text blocks to caller via buffered receive()
1496                return Ok(text_blocks);
1497            }
1498
1499            // ========================================================================
1500            // STEP 4: Check iteration limit BEFORE executing tools
1501            // ========================================================================
1502            // Increment counter and check if we've hit the max
1503            iteration += 1;
1504            if iteration > max_iterations {
1505                // Max iterations reached - stop execution and return what we have
1506                // This prevents infinite tool-calling loops
1507                if !text_blocks.is_empty() {
1508                    let assistant_msg = Message::assistant(text_blocks.clone());
1509                    self.history.push(assistant_msg);
1510                }
1511                return Ok(text_blocks);
1512            }
1513
1514            // ========================================================================
1515            // STEP 5: Add assistant message to history
1516            // ========================================================================
1517            // The assistant message includes BOTH text and tool use blocks
1518            // This preserves the full context for future turns
1519            let mut all_blocks = text_blocks.clone();
1520            all_blocks.extend(tool_blocks.clone());
1521            let assistant_msg = Message::assistant(all_blocks);
1522            self.history.push(assistant_msg);
1523
1524            // ========================================================================
1525            // STEP 6: Execute all tools and collect results
1526            // ========================================================================
1527            for block in tool_blocks {
1528                if let ContentBlock::ToolUse(tool_use) = block {
1529                    // Create simplified history snapshot for hooks
1530                    // TODO: Full serialization of history for hooks
1531                    let history_snapshot: Vec<serde_json::Value> =
1532                        self.history.iter().map(|_| serde_json::json!({})).collect();
1533
1534                    // ============================================================
1535                    // Execute PreToolUse hooks
1536                    // ============================================================
1537                    use crate::hooks::PreToolUseEvent;
1538                    let pre_event = PreToolUseEvent::new(
1539                        tool_use.name.clone(),
1540                        tool_use.input.clone(),
1541                        tool_use.id.clone(),
1542                        history_snapshot.clone(),
1543                    );
1544
1545                    // Track whether to execute and what input to use
1546                    let mut tool_input = tool_use.input.clone();
1547                    let mut should_execute = true;
1548                    let mut block_reason = None;
1549
1550                    // Execute all PreToolUse hooks
1551                    if let Some(decision) =
1552                        self.options.hooks().execute_pre_tool_use(pre_event).await
1553                    {
1554                        if !decision.continue_execution() {
1555                            // Hook blocked execution
1556                            should_execute = false;
1557                            block_reason = decision.reason().map(|s| s.to_string());
1558                        } else if let Some(modified) = decision.modified_input() {
1559                            // Hook modified the input
1560                            tool_input = modified.clone();
1561                        }
1562                    }
1563
1564                    // ============================================================
1565                    // Execute tool (or create error result if blocked)
1566                    // ============================================================
1567                    let result = if should_execute {
1568                        // Actually execute the tool
1569                        match self
1570                            .execute_tool_internal(&tool_use.name, tool_input.clone())
1571                            .await
1572                        {
1573                            Ok(res) => res, // Success - use the result
1574                            Err(e) => {
1575                                // Tool execution failed - convert to JSON error
1576                                // This allows the conversation to continue
1577                                serde_json::json!({
1578                                    "error": e.to_string(),
1579                                    "tool": tool_use.name,
1580                                    "id": tool_use.id
1581                                })
1582                            }
1583                        }
1584                    } else {
1585                        // Tool blocked by PreToolUse hook - create error result
1586                        serde_json::json!({
1587                            "error": "Tool execution blocked by hook",
1588                            "reason": block_reason.unwrap_or_else(|| "No reason provided".to_string()),
1589                            "tool": tool_use.name,
1590                            "id": tool_use.id
1591                        })
1592                    };
1593
1594                    // ============================================================
1595                    // Execute PostToolUse hooks
1596                    // ============================================================
1597                    use crate::hooks::PostToolUseEvent;
1598                    let post_event = PostToolUseEvent::new(
1599                        tool_use.name.clone(),
1600                        tool_input,
1601                        tool_use.id.clone(),
1602                        result.clone(),
1603                        history_snapshot,
1604                    );
1605
1606                    let mut final_result = result;
1607                    if let Some(decision) =
1608                        self.options.hooks().execute_post_tool_use(post_event).await
1609                    {
1610                        // PostToolUse can modify the result
1611                        // Note: Uses modified_input field (naming is historical)
1612                        if let Some(modified) = decision.modified_input() {
1613                            final_result = modified.clone();
1614                        }
1615                    }
1616
1617                    // ============================================================
1618                    // Add tool result to history
1619                    // ============================================================
1620                    // Tool results are added as user messages (per OpenAI convention)
1621                    let tool_result = ToolResultBlock::new(&tool_use.id, final_result);
1622                    let tool_result_msg =
1623                        Message::user_with_blocks(vec![ContentBlock::ToolResult(tool_result)]);
1624                    self.history.push(tool_result_msg);
1625                }
1626            }
1627
1628            // ========================================================================
1629            // STEP 7: Continue conversation to get next response
1630            // ========================================================================
1631            // Send empty string to continue - the history contains all context
1632            self.send("").await?;
1633
1634            // Loop continues to collect and process the next response
1635            // This will either be more tool calls or the final text answer
1636        }
1637    }
1638
1639    /// Receives the next content block from the current stream.
1640    ///
1641    /// This is the primary method for consuming responses from the model. It works
1642    /// differently depending on the operating mode:
1643    ///
1644    /// ## Manual Mode (default)
1645    ///
1646    /// Streams blocks directly from the API response as they arrive. You receive:
1647    /// - `TextBlock`: Incremental text from the model
1648    /// - `ToolUseBlock`: Requests to execute tools
1649    /// - Other block types as they're emitted
1650    ///
1651    /// When you receive a `ToolUseBlock`, you must:
1652    /// 1. Execute the tool yourself
1653    /// 2. Call `add_tool_result()` with the result
1654    /// 3. Call `send("")` to continue the conversation
1655    ///
1656    /// ## Automatic Mode (`auto_execute_tools = true`)
1657    ///
1658    /// Transparently executes tools and only returns final text blocks. The first
1659    /// call to `receive()` triggers the auto-execution loop which:
1660    /// 1. Collects all blocks from the stream
1661    /// 2. Executes any tool calls automatically
1662    /// 3. Continues the conversation until reaching a text-only response
1663    /// 4. Buffers the final text blocks
1664    /// 5. Returns them one at a time on subsequent `receive()` calls
1665    ///
1666    /// # Returns
1667    ///
1668    /// - `Ok(Some(block))`: Successfully received a content block
1669    /// - `Ok(None)`: Stream ended normally or was interrupted
1670    /// - `Err(e)`: An error occurred during streaming or tool execution
1671    ///
1672    /// # Behavior Details
1673    ///
1674    /// ## Interruption
1675    ///
1676    /// Checks the interrupt flag on every call. If interrupted, immediately returns
1677    /// `Ok(None)` and clears the stream. The client can be reused after interruption.
1678    ///
1679    /// ## Stream Lifecycle
1680    ///
1681    /// 1. After `send()`, stream is active
1682    /// 2. Each `receive()` call yields one block
1683    /// 3. When stream ends, returns `Ok(None)`
1684    /// 4. Subsequent calls continue returning `Ok(None)` until next `send()`
1685    ///
1686    /// ## Auto-Execution Buffer
1687    ///
1688    /// In auto mode, blocks are buffered in memory. The buffer persists until
1689    /// fully consumed (index reaches length), at which point it's cleared.
1690    ///
1691    /// # State Changes
1692    ///
1693    /// - Advances stream position
1694    /// - In auto mode: May trigger entire execution loop and modify history
1695    /// - In manual mode: Only reads from stream, no history changes
1696    /// - Increments `auto_exec_index` when returning buffered blocks
1697    ///
1698    /// # Examples
1699    ///
1700    /// ## Manual Mode - Basic
1701    ///
1702    /// ```rust,no_run
1703    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1704    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1705    /// # let mut client = Client::new(AgentOptions::default())?;
1706    /// client.send("Hello!").await?;
1707    ///
1708    /// while let Some(block) = client.receive().await? {
1709    ///     match block {
1710    ///         ContentBlock::Text(text) => print!("{}", text.text),
1711    ///         _ => {}
1712    ///     }
1713    /// }
1714    /// # Ok(())
1715    /// # }
1716    /// ```
1717    ///
1718    /// ## Manual Mode - With Tools
1719    ///
1720    /// ```rust,no_run
1721    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1722    /// # use serde_json::json;
1723    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1724    /// # let mut client = Client::new(AgentOptions::default())?;
1725    /// client.send("Use the calculator").await?;
1726    ///
1727    /// while let Some(block) = client.receive().await? {
1728    ///     match block {
1729    ///         ContentBlock::Text(text) => {
1730    ///             println!("{}", text.text);
1731    ///         }
1732    ///         ContentBlock::ToolUse(tool_use) => {
1733    ///             println!("Executing: {}", tool_use.name);
1734    ///
1735    ///             // Execute tool manually
1736    ///             let result = json!({"result": 42});
1737    ///
1738    ///             // Add result and continue
1739    ///             client.add_tool_result(&tool_use.id, result)?;
1740    ///             client.send("").await?;
1741    ///         }
1742    ///         _ => {}
1743    ///     }
1744    /// }
1745    /// # Ok(())
1746    /// # }
1747    /// ```
1748    ///
1749    /// ## Auto Mode
1750    ///
1751    /// ```rust,no_run
1752    /// # use open_agent::{Client, AgentOptions, ContentBlock, Tool};
1753    /// # use serde_json::json;
1754    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1755    /// let mut client = Client::new(AgentOptions::builder()
1756    ///     .auto_execute_tools(true)
1757    ///     .build()?)?;
1758    ///
1759    /// client.send("Calculate 2+2").await?;
1760    ///
1761    /// // Tools execute automatically - you only get final text
1762    /// while let Some(block) = client.receive().await? {
1763    ///     if let ContentBlock::Text(text) = block {
1764    ///         println!("{}", text.text);
1765    ///     }
1766    /// }
1767    /// # Ok(())
1768    /// # }
1769    /// ```
1770    ///
1771    /// ## With Error Handling
1772    ///
1773    /// ```rust,no_run
1774    /// # use open_agent::{Client, AgentOptions};
1775    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1776    /// # let mut client = Client::new(AgentOptions::default())?;
1777    /// client.send("Hello").await?;
1778    ///
1779    /// loop {
1780    ///     match client.receive().await {
1781    ///         Ok(Some(block)) => {
1782    ///             // Process block
1783    ///         }
1784    ///         Ok(None) => {
1785    ///             // Stream ended
1786    ///             break;
1787    ///         }
1788    ///         Err(e) => {
1789    ///             eprintln!("Error: {}", e);
1790    ///             break;
1791    ///         }
1792    ///     }
1793    /// }
1794    /// # Ok(())
1795    /// # }
1796    /// ```
1797    pub async fn receive(&mut self) -> Result<Option<ContentBlock>> {
1798        // ========================================================================
1799        // AUTO-EXECUTION MODE
1800        // ========================================================================
1801        if self.options.auto_execute_tools() {
1802            // Check if we have buffered blocks to return
1803            // In auto mode, all final text blocks are buffered and returned one at a time
1804            if self.auto_exec_index < self.auto_exec_buffer.len() {
1805                // Return next buffered block
1806                let block = self.auto_exec_buffer[self.auto_exec_index].clone();
1807                self.auto_exec_index += 1;
1808                return Ok(Some(block));
1809            }
1810
1811            // No buffered blocks - need to run auto-execution loop
1812            // This only happens on the first receive() call after send()
1813            if self.auto_exec_buffer.is_empty() {
1814                match self.auto_execute_loop().await {
1815                    Ok(blocks) => {
1816                        // Buffer all final text blocks
1817                        self.auto_exec_buffer = blocks;
1818                        self.auto_exec_index = 0;
1819
1820                        // If no blocks, return None (empty response)
1821                        if self.auto_exec_buffer.is_empty() {
1822                            return Ok(None);
1823                        }
1824
1825                        // Return first buffered block
1826                        let block = self.auto_exec_buffer[0].clone();
1827                        self.auto_exec_index = 1;
1828                        return Ok(Some(block));
1829                    }
1830                    Err(e) => return Err(e),
1831                }
1832            }
1833
1834            // Buffer exhausted - return None
1835            Ok(None)
1836        } else {
1837            // ====================================================================
1838            // MANUAL MODE
1839            // ====================================================================
1840            // Stream blocks directly from API without buffering or auto-execution
1841            self.receive_one().await
1842        }
1843    }
1844
1845    /// Interrupts the current operation by setting the interrupt flag.
1846    ///
1847    /// This method provides a thread-safe way to cancel any in-progress streaming
1848    /// operation. The interrupt flag is checked by `receive()` before each block,
1849    /// allowing responsive cancellation.
1850    ///
1851    /// # Behavior
1852    ///
1853    /// - Sets the atomic interrupt flag to `true`
1854    /// - Next `receive()` call will return `Ok(None)` and clear the stream
1855    /// - Flag is automatically reset to `false` on next `send()` call
1856    /// - Safe to call from any thread (uses atomic operations)
1857    /// - Idempotent: calling multiple times has same effect as calling once
1858    /// - No-op if no operation is in progress
1859    ///
1860    /// # Thread Safety
1861    ///
1862    /// This method uses `Arc<AtomicBool>` internally, which can be safely shared
1863    /// across threads. You can clone the interrupt handle and use it from different
1864    /// threads or async tasks:
1865    ///
1866    /// ```rust,no_run
1867    /// # use open_agent::{Client, AgentOptions};
1868    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1869    /// let mut client = Client::new(AgentOptions::default())?;
1870    /// let interrupt_handle = client.interrupt_handle();
1871    ///
1872    /// // Use from another thread
1873    /// tokio::spawn(async move {
1874    ///     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1875    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1876    /// });
1877    /// # Ok(())
1878    /// # }
1879    /// ```
1880    ///
1881    /// # State Changes
1882    ///
1883    /// - Sets `interrupted` flag to `true`
1884    /// - Does NOT modify stream, history, or other state directly
1885    /// - Effect takes place on next `receive()` call
1886    ///
1887    /// # Use Cases
1888    ///
1889    /// - User cancellation (e.g., stop button in UI)
1890    /// - Timeout enforcement
1891    /// - Resource cleanup
1892    /// - Emergency shutdown
1893    ///
1894    /// # Examples
1895    ///
1896    /// ## Basic Interruption
1897    ///
1898    /// ```rust,no_run
1899    /// use open_agent::{Client, AgentOptions};
1900    ///
1901    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1902    /// let mut client = Client::new(AgentOptions::default())?;
1903    ///
1904    /// client.send("Tell me a long story").await?;
1905    ///
1906    /// // Interrupt after receiving some blocks
1907    /// let mut count = 0;
1908    /// while let Some(block) = client.receive().await? {
1909    ///     count += 1;
1910    ///     if count >= 5 {
1911    ///         client.interrupt();
1912    ///     }
1913    /// }
1914    ///
1915    /// // Client is ready for new queries
1916    /// client.send("What's 2+2?").await?;
1917    /// # Ok(())
1918    /// # }
1919    /// ```
1920    ///
1921    /// ## With Timeout
1922    ///
1923    /// ```rust,no_run
1924    /// use open_agent::{Client, AgentOptions};
1925    /// use std::time::Duration;
1926    ///
1927    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1928    /// let mut client = Client::new(AgentOptions::default())?;
1929    ///
1930    /// client.send("Long request").await?;
1931    ///
1932    /// // Spawn timeout task
1933    /// let interrupt_handle = client.interrupt_handle();
1934    /// tokio::spawn(async move {
1935    ///     tokio::time::sleep(Duration::from_secs(10)).await;
1936    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1937    /// });
1938    ///
1939    /// while let Some(_block) = client.receive().await? {
1940    ///     // Process until timeout
1941    /// }
1942    /// # Ok(())
1943    /// # }
1944    /// ```
1945    pub fn interrupt(&self) {
1946        // Set interrupt flag using SeqCst for immediate visibility across all threads
1947        self.interrupted.store(true, Ordering::SeqCst);
1948    }
1949
1950    /// Returns a clone of the interrupt handle for thread-safe cancellation.
1951    ///
1952    /// This method provides access to the shared `Arc<AtomicBool>` interrupt flag,
1953    /// allowing it to be used from other threads or async tasks to signal cancellation.
1954    ///
1955    /// # Returns
1956    ///
1957    /// A cloned `Arc<AtomicBool>` that can be used to interrupt operations from any thread.
1958    ///
1959    /// # Examples
1960    ///
1961    /// ```rust,no_run
1962    /// # use open_agent::{Client, AgentOptions};
1963    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1964    /// let mut client = Client::new(AgentOptions::default())?;
1965    /// let interrupt_handle = client.interrupt_handle();
1966    ///
1967    /// // Use from another thread
1968    /// tokio::spawn(async move {
1969    ///     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1970    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1971    /// });
1972    /// # Ok(())
1973    /// # }
1974    /// ```
1975    pub fn interrupt_handle(&self) -> Arc<AtomicBool> {
1976        self.interrupted.clone()
1977    }
1978
1979    /// Returns a reference to the conversation history.
1980    ///
1981    /// The history contains all messages exchanged in the conversation, including:
1982    /// - User messages
1983    /// - Assistant messages (with text and tool use blocks)
1984    /// - Tool result messages
1985    ///
1986    /// # Returns
1987    ///
1988    /// A slice of `Message` objects in chronological order.
1989    ///
1990    /// # Use Cases
1991    ///
1992    /// - Inspecting conversation context
1993    /// - Debugging tool execution flow
1994    /// - Saving conversation state
1995    /// - Implementing custom history management
1996    ///
1997    /// # Examples
1998    ///
1999    /// ```rust
2000    /// # use open_agent::{Client, AgentOptions};
2001    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2002    /// let client = Client::new(AgentOptions::default())?;
2003    ///
2004    /// // Initially empty
2005    /// assert_eq!(client.history().len(), 0);
2006    /// # Ok(())
2007    /// # }
2008    /// ```
2009    pub fn history(&self) -> &[Message] {
2010        &self.history
2011    }
2012
2013    /// Returns a mutable reference to the conversation history.
2014    ///
2015    /// This allows you to modify the history directly for advanced use cases like:
2016    /// - Removing old messages to manage context length
2017    /// - Editing messages for retry scenarios
2018    /// - Injecting synthetic messages for testing
2019    ///
2020    /// # Warning
2021    ///
2022    /// Modifying history directly can lead to inconsistent conversation state if not
2023    /// done carefully. The SDK expects history to follow the proper message flow
2024    /// (user → assistant → tool results → assistant, etc.).
2025    ///
2026    /// # Examples
2027    ///
2028    /// ```rust
2029    /// # use open_agent::{Client, AgentOptions};
2030    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2031    /// let mut client = Client::new(AgentOptions::default())?;
2032    ///
2033    /// // Remove oldest messages to stay within context limit
2034    /// if client.history().len() > 50 {
2035    ///     client.history_mut().drain(0..10);
2036    /// }
2037    /// # Ok(())
2038    /// # }
2039    /// ```
2040    pub fn history_mut(&mut self) -> &mut Vec<Message> {
2041        &mut self.history
2042    }
2043
2044    /// Returns a reference to the agent configuration options.
2045    ///
2046    /// Provides read-only access to the `AgentOptions` used to configure this client.
2047    ///
2048    /// # Use Cases
2049    ///
2050    /// - Inspecting current configuration
2051    /// - Debugging issues
2052    /// - Conditional logic based on settings
2053    ///
2054    /// # Examples
2055    ///
2056    /// ```rust
2057    /// # use open_agent::{Client, AgentOptions};
2058    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2059    /// let client = Client::new(AgentOptions::builder()
2060    ///     .model("gpt-4")
2061    ///     .base_url("http://localhost:1234/v1")
2062    ///     .build()?)?;
2063    ///
2064    /// println!("Using model: {}", client.options().model());
2065    /// # Ok(())
2066    /// # }
2067    /// ```
2068    pub fn options(&self) -> &AgentOptions {
2069        &self.options
2070    }
2071
2072    /// Clears all conversation history.
2073    ///
2074    /// This resets the conversation to a blank slate while preserving the client
2075    /// configuration (tools, hooks, model, etc.). The next message will start a
2076    /// fresh conversation with no prior context.
2077    ///
2078    /// # State Changes
2079    ///
2080    /// - Clears `history` vector
2081    /// - Does NOT modify current stream, options, or other state
2082    ///
2083    /// # Use Cases
2084    ///
2085    /// - Starting a new conversation
2086    /// - Preventing context length issues
2087    /// - Clearing sensitive data
2088    /// - Implementing conversation sessions
2089    ///
2090    /// # Examples
2091    ///
2092    /// ```rust,no_run
2093    /// # use open_agent::{Client, AgentOptions, ContentBlock};
2094    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2095    /// let mut client = Client::new(AgentOptions::default())?;
2096    ///
2097    /// // First conversation
2098    /// client.send("Hello").await?;
2099    /// while let Some(_) = client.receive().await? {}
2100    ///
2101    /// // Clear and start fresh
2102    /// client.clear_history();
2103    ///
2104    /// // New conversation with no memory of previous
2105    /// client.send("Hello again").await?;
2106    /// # Ok(())
2107    /// # }
2108    /// ```
2109    pub fn clear_history(&mut self) {
2110        self.history.clear();
2111    }
2112
2113    /// Adds a tool result to the conversation history for manual tool execution.
2114    ///
2115    /// This method is used exclusively in **manual mode** after receiving a `ToolUseBlock`.
2116    /// The workflow is:
2117    ///
2118    /// 1. `receive()` returns a `ToolUseBlock`
2119    /// 2. You execute the tool yourself
2120    /// 3. Call `add_tool_result()` with the tool's output
2121    /// 4. Call `send("")` to continue the conversation
2122    /// 5. The model receives the tool result and generates a response
2123    ///
2124    /// # Parameters
2125    ///
2126    /// - `tool_use_id`: The unique ID from the `ToolUseBlock` (must match exactly)
2127    /// - `content`: The tool's output as a JSON value
2128    ///
2129    /// # Behavior
2130    ///
2131    /// Creates a `ToolResultBlock` and adds it to conversation history as a tool message.
2132    /// This preserves the tool call/result pairing that the model needs to understand
2133    /// the conversation flow.
2134    ///
2135    /// # State Changes
2136    ///
2137    /// - Appends a tool message to `history`
2138    /// - Does NOT modify stream or trigger any requests
2139    ///
2140    /// # Important Notes
2141    ///
2142    /// - **Not used in auto mode**: Auto-execution handles tool results automatically
2143    /// - **ID must match**: The `tool_use_id` must match the ID from the `ToolUseBlock`
2144    /// - **No validation**: This method doesn't validate the result format
2145    /// - **Must call send()**: After adding result(s), call `send("")` to continue
2146    ///
2147    /// # Examples
2148    ///
2149    /// ## Basic Manual Tool Execution
2150    ///
2151    /// ```rust,no_run
2152    /// use open_agent::{Client, AgentOptions, ContentBlock};
2153    /// use serde_json::json;
2154    ///
2155    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2156    /// let mut client = Client::new(AgentOptions::default())?;
2157    /// client.send("Use the calculator").await?;
2158    ///
2159    /// while let Some(block) = client.receive().await? {
2160    ///     match block {
2161    ///         ContentBlock::ToolUse(tool_use) => {
2162    ///             // Execute tool manually
2163    ///             let result = json!({"result": 42});
2164    ///
2165    ///             // Add result to history
2166    ///             client.add_tool_result(&tool_use.id, result)?;
2167    ///
2168    ///             // Continue conversation to get model's response
2169    ///             client.send("").await?;
2170    ///         }
2171    ///         ContentBlock::Text(text) => {
2172    ///             println!("{}", text.text);
2173    ///         }
2174    ///         _ => {}
2175    ///     }
2176    /// }
2177    /// # Ok(())
2178    /// # }
2179    /// ```
2180    ///
2181    /// ## Handling Tool Errors
2182    ///
2183    /// ```rust,no_run
2184    /// use open_agent::{Client, AgentOptions, ContentBlock};
2185    /// use serde_json::json;
2186    ///
2187    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2188    /// # let mut client = Client::new(AgentOptions::default())?;
2189    /// # client.send("test").await?;
2190    /// while let Some(block) = client.receive().await? {
2191    ///     if let ContentBlock::ToolUse(tool_use) = block {
2192    ///         // Try to execute tool
2193    ///         let result = match execute_tool(&tool_use.name, &tool_use.input) {
2194    ///             Ok(output) => output,
2195    ///             Err(e) => json!({
2196    ///                 "error": e.to_string(),
2197    ///                 "tool": tool_use.name
2198    ///             })
2199    ///         };
2200    ///
2201    ///         client.add_tool_result(&tool_use.id, result)?;
2202    ///         client.send("").await?;
2203    ///     }
2204    /// }
2205    ///
2206    /// # fn execute_tool(name: &str, input: &serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
2207    /// #     Ok(json!({}))
2208    /// # }
2209    /// # Ok(())
2210    /// # }
2211    /// ```
2212    ///
2213    /// ## Multiple Tool Calls
2214    ///
2215    /// ```rust,no_run
2216    /// use open_agent::{Client, AgentOptions, ContentBlock};
2217    /// use serde_json::json;
2218    ///
2219    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2220    /// # let mut client = Client::new(AgentOptions::default())?;
2221    /// client.send("Calculate 2+2 and 3+3").await?;
2222    ///
2223    /// let mut tool_calls = Vec::new();
2224    ///
2225    /// // Collect all tool calls
2226    /// while let Some(block) = client.receive().await? {
2227    ///     if let ContentBlock::ToolUse(tool_use) = block {
2228    ///         tool_calls.push(tool_use);
2229    ///     }
2230    /// }
2231    ///
2232    /// // Execute and add results for all tools
2233    /// for tool_call in tool_calls {
2234    ///     let result = json!({"result": 42}); // Execute tool
2235    ///     client.add_tool_result(&tool_call.id, result)?;
2236    /// }
2237    ///
2238    /// // Continue conversation
2239    /// client.send("").await?;
2240    /// # Ok(())
2241    /// # }
2242    /// ```
2243    pub fn add_tool_result(&mut self, tool_use_id: &str, content: serde_json::Value) -> Result<()> {
2244        use crate::types::ToolResultBlock;
2245
2246        // Create a tool result block with the given ID and content
2247        let result_block = ToolResultBlock::new(tool_use_id, content);
2248
2249        // Add to history as a tool message
2250        // Note: Currently using a simplified representation with TextBlock
2251        // TODO: Properly serialize ToolResultBlock for OpenAI format
2252        let serialized = serde_json::to_string(&result_block.content)
2253            .map_err(|e| Error::config(format!("Failed to serialize tool result: {}", e)))?;
2254
2255        self.history.push(Message::new(
2256            MessageRole::Tool,
2257            vec![ContentBlock::Text(TextBlock::new(serialized))],
2258        ));
2259
2260        Ok(())
2261    }
2262
2263    /// Looks up a registered tool by name.
2264    ///
2265    /// This method provides access to the tool registry for manual execution scenarios.
2266    /// It searches the tools registered in `AgentOptions` and returns a reference to
2267    /// the matching tool if found.
2268    ///
2269    /// # Parameters
2270    ///
2271    /// - `name`: The tool name to search for (case-sensitive)
2272    ///
2273    /// # Returns
2274    ///
2275    /// - `Some(&Tool)`: Tool found
2276    /// - `None`: No tool with that name
2277    ///
2278    /// # Use Cases
2279    ///
2280    /// - Manual tool execution in response to `ToolUseBlock`
2281    /// - Validating tool availability before offering features
2282    /// - Inspecting tool metadata (name, description, schema)
2283    ///
2284    /// # Examples
2285    ///
2286    /// ## Execute Tool Manually
2287    ///
2288    /// ```rust,no_run
2289    /// use open_agent::{Client, AgentOptions, ContentBlock};
2290    ///
2291    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2292    /// # let mut client = Client::new(AgentOptions::default())?;
2293    /// # client.send("test").await?;
2294    /// while let Some(block) = client.receive().await? {
2295    ///     if let ContentBlock::ToolUse(tool_use) = block {
2296    ///         if let Some(tool) = client.get_tool(&tool_use.name) {
2297    ///             // Execute the tool
2298    ///             let result = tool.execute(tool_use.input.clone()).await?;
2299    ///             client.add_tool_result(&tool_use.id, result)?;
2300    ///             client.send("").await?;
2301    ///         } else {
2302    ///             println!("Unknown tool: {}", tool_use.name);
2303    ///         }
2304    ///     }
2305    /// }
2306    /// # Ok(())
2307    /// # }
2308    /// ```
2309    ///
2310    /// ## Check Tool Availability
2311    ///
2312    /// ```rust
2313    /// # use open_agent::{Client, AgentOptions};
2314    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2315    /// # let client = Client::new(AgentOptions::default())?;
2316    /// if client.get_tool("calculator").is_some() {
2317    ///     println!("Calculator is available");
2318    /// }
2319    /// # Ok(())
2320    /// # }
2321    /// ```
2322    pub fn get_tool(&self, name: &str) -> Option<&crate::tools::Tool> {
2323        // Search registered tools by name
2324        self.options
2325            .tools()
2326            .iter()
2327            .find(|t| t.name() == name)
2328            .map(|t| t.as_ref())
2329    }
2330}
2331
2332#[cfg(test)]
2333mod tests {
2334    use super::*;
2335
2336    #[test]
2337    fn test_client_creation() {
2338        let options = AgentOptions::builder()
2339            .system_prompt("Test")
2340            .model("test-model")
2341            .base_url("http://localhost:1234/v1")
2342            .build()
2343            .unwrap();
2344
2345        let client = Client::new(options).expect("Should create client successfully");
2346        assert_eq!(client.history().len(), 0);
2347    }
2348
2349    #[test]
2350    fn test_client_new_returns_result() {
2351        // Test that Client::new() returns Result instead of panicking
2352        let options = AgentOptions::builder()
2353            .system_prompt("Test")
2354            .model("test-model")
2355            .base_url("http://localhost:1234/v1")
2356            .build()
2357            .unwrap();
2358
2359        // This should not panic - it should return Ok(client)
2360        let result = Client::new(options);
2361        assert!(result.is_ok(), "Client::new() should return Ok");
2362
2363        let client = result.unwrap();
2364        assert_eq!(client.history().len(), 0);
2365    }
2366
2367    #[test]
2368    fn test_interrupt_flag_initial_state() {
2369        let options = AgentOptions::builder()
2370            .system_prompt("Test")
2371            .model("test-model")
2372            .base_url("http://localhost:1234/v1")
2373            .build()
2374            .unwrap();
2375
2376        let client = Client::new(options).expect("Should create client successfully");
2377        // Initially not interrupted
2378        assert!(!client.interrupted.load(Ordering::SeqCst));
2379    }
2380
2381    #[test]
2382    fn test_interrupt_sets_flag() {
2383        let options = AgentOptions::builder()
2384            .system_prompt("Test")
2385            .model("test-model")
2386            .base_url("http://localhost:1234/v1")
2387            .build()
2388            .unwrap();
2389
2390        let client = Client::new(options).expect("Should create client successfully");
2391        client.interrupt();
2392        assert!(client.interrupted.load(Ordering::SeqCst));
2393    }
2394
2395    #[test]
2396    fn test_interrupt_idempotent() {
2397        let options = AgentOptions::builder()
2398            .system_prompt("Test")
2399            .model("test-model")
2400            .base_url("http://localhost:1234/v1")
2401            .build()
2402            .unwrap();
2403
2404        let client = Client::new(options).expect("Should create client successfully");
2405        client.interrupt();
2406        assert!(client.interrupted.load(Ordering::SeqCst));
2407
2408        // Call again - should still be interrupted
2409        client.interrupt();
2410        assert!(client.interrupted.load(Ordering::SeqCst));
2411    }
2412
2413    #[tokio::test]
2414    async fn test_receive_returns_none_when_interrupted() {
2415        let options = AgentOptions::builder()
2416            .system_prompt("Test")
2417            .model("test-model")
2418            .base_url("http://localhost:1234/v1")
2419            .build()
2420            .unwrap();
2421
2422        let mut client = Client::new(options).expect("Should create client successfully");
2423
2424        // Interrupt before receiving
2425        client.interrupt();
2426
2427        // NEW SIGNATURE: receive() should return Ok(None) when interrupted
2428        let result = client.receive().await;
2429        assert!(result.is_ok());
2430        assert!(result.unwrap().is_none());
2431    }
2432
2433    #[tokio::test]
2434    async fn test_receive_returns_ok_none_when_no_stream() {
2435        let options = AgentOptions::builder()
2436            .system_prompt("Test")
2437            .model("test-model")
2438            .base_url("http://localhost:1234/v1")
2439            .build()
2440            .unwrap();
2441
2442        let mut client = Client::new(options).expect("Should create client successfully");
2443
2444        // No stream started - receive() should return Ok(None)
2445        let result = client.receive().await;
2446        assert!(result.is_ok());
2447        assert!(result.unwrap().is_none());
2448    }
2449
2450    #[tokio::test]
2451    async fn test_receive_error_propagation() {
2452        // This test demonstrates that errors are wrapped in Err(), not Some(Err())
2453        // We'll verify this behavior when we have a mock stream that produces errors
2454        let options = AgentOptions::builder()
2455            .system_prompt("Test")
2456            .model("test-model")
2457            .base_url("http://localhost:1234/v1")
2458            .build()
2459            .unwrap();
2460
2461        let client = Client::new(options).expect("Should create client successfully");
2462
2463        // Signature check: receive() returns Result<Option<ContentBlock>>
2464        // This means we can use ? operator cleanly:
2465        // while let Some(block) = client.receive().await? { ... }
2466
2467        // Type assertion to ensure signature is correct
2468        let _: Result<Option<ContentBlock>> = std::future::ready(Ok(None)).await;
2469        drop(client);
2470    }
2471}