open_agent/
client.rs

1//! Client for streaming queries and multi-turn conversations
2//!
3//! This module provides the core streaming client implementation for the Open Agent SDK.
4//! It handles communication with OpenAI-compatible APIs, manages conversation history,
5//! and provides two modes of operation: manual and automatic tool execution.
6//!
7//! # Architecture Overview
8//!
9//! The SDK implements a **streaming-first architecture** where all responses from the model
10//! are received as a stream of content blocks. This design enables:
11//!
12//! - **Progressive rendering**: Display text as it arrives without waiting for completion
13//! - **Real-time tool execution**: Execute tools as they're requested by the model
14//! - **Interruption support**: Cancel operations mid-stream without corrupting state
15//! - **Memory efficiency**: Process large responses without buffering everything in memory
16//!
17//! ## Two Operating Modes
18//!
19//! ### 1. Manual Tool Execution Mode (default)
20//!
21//! In manual mode, the client streams content blocks directly to the caller. When the model
22//! requests a tool, the caller receives a `ToolUseBlock`, executes the tool, adds the result
23//! using `add_tool_result()`, and continues the conversation with another `send()` call.
24//!
25//! **Use cases**: Custom tool execution logic, interactive debugging, fine-grained control
26//!
27//! ### 2. Automatic Tool Execution Mode
28//!
29//! When `auto_execute_tools` is enabled, the client automatically executes tools and continues
30//! the conversation until receiving a text-only response. The caller only receives the final
31//! text blocks after all tool iterations complete.
32//!
33//! **Use cases**: Simple agentic workflows, automated task completion, batch processing
34//!
35//! ## Request Flow
36//!
37//! ```text
38//! User sends prompt
39//!     │
40//!     ├─> UserPromptSubmit hook executes (can modify/block prompt)
41//!     │
42//!     ├─> Prompt added to history
43//!     │
44//!     ├─> HTTP request to OpenAI-compatible API
45//!     │
46//!     ├─> Response streamed as Server-Sent Events (SSE)
47//!     │
48//!     ├─> SSE chunks aggregated into ContentBlocks
49//!     │
50//!     └─> Blocks emitted to caller (or buffered for auto-execution)
51//! ```
52//!
53//! ## Tool Execution Flow
54//!
55//! ### Manual Mode:
56//! ```text
57//! receive() → ToolUseBlock
58//!     │
59//!     ├─> Caller executes tool
60//!     │
61//!     ├─> Caller calls add_tool_result()
62//!     │
63//!     ├─> Caller calls send("") to continue
64//!     │
65//!     └─> receive() → TextBlock (model's response)
66//! ```
67//!
68//! ### Auto Mode:
69//! ```text
70//! receive() triggers auto-execution loop
71//!     │
72//!     ├─> Collect all blocks from stream
73//!     │
74//!     ├─> For each ToolUseBlock:
75//!     │   ├─> PreToolUse hook executes (can modify/block)
76//!     │   ├─> Tool executed via Tool.execute()
77//!     │   ├─> PostToolUse hook executes (can modify result)
78//!     │   └─> Result added to history
79//!     │
80//!     ├─> Continue conversation with send("")
81//!     │
82//!     ├─> Repeat until text-only response or max iterations
83//!     │
84//!     └─> Return text blocks one-by-one via receive()
85//! ```
86//!
87//! ## State Management
88//!
89//! The client maintains several pieces of state:
90//!
91//! - **history**: Full conversation history (`Vec<Message>`)
92//! - **current_stream**: Active SSE stream being consumed (`Option<ContentStream>`)
93//! - **interrupted**: Atomic flag for cancellation (`Arc<AtomicBool>`)
94//! - **auto_exec_buffer**: Buffered blocks for auto-execution mode (`Vec<ContentBlock>`)
95//! - **auto_exec_index**: Current position in buffer (usize)
96//!
97//! ## Interruption Mechanism
98//!
99//! The interrupt system uses `Arc<AtomicBool>` to enable safe, thread-safe cancellation:
100//!
101//! ```rust,no_run
102//! # use open_agent::{Client, AgentOptions};
103//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
104//! let mut client = Client::new(AgentOptions::default())?;
105//! let handle = client.interrupt_handle(); // Clone Arc for use in other threads
106//!
107//! // In another thread or async task:
108//! tokio::spawn(async move {
109//!     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
110//!     handle.store(true, std::sync::atomic::Ordering::SeqCst);
111//! });
112//!
113//! client.send("Long request").await?;
114//! while let Some(block) = client.receive().await? {
115//!     // Will stop when interrupted
116//! }
117//! # Ok(())
118//! # }
119//! ```
120//!
121//! ## Hook Integration
122//!
123//! Hooks provide extension points throughout the request lifecycle:
124//!
125//! - **UserPromptSubmit**: Called before sending user prompt (can modify or block)
126//! - **PreToolUse**: Called before executing each tool (can modify input or block execution)
127//! - **PostToolUse**: Called after tool execution (can modify result)
128//!
129//! Hooks are only invoked in specific scenarios and have access to conversation history.
130//!
131//! ## Error Handling
132//!
133//! Errors are propagated immediately and leave the client in a valid state:
134//!
135//! - **HTTP errors**: Network failures, timeouts, connection issues
136//! - **API errors**: Invalid model, authentication failures, rate limits
137//! - **Parse errors**: Malformed SSE responses, invalid JSON
138//! - **Tool errors**: Tool execution failures (converted to JSON error responses)
139//! - **Hook errors**: Hook execution failures or blocked operations
140//!
141//! After an error, the client remains usable for new requests.
142//!
143//! # Examples
144//!
145//! ## Simple Single-Turn Query
146//!
147//! ```rust,no_run
148//! use open_agent::{query, AgentOptions};
149//! use futures::StreamExt;
150//!
151//! #[tokio::main]
152//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
153//!     let options = AgentOptions::builder()
154//!         .model("gpt-4")
155//!         .api_key("sk-...")
156//!         .build()?;
157//!
158//!     let mut stream = query("What is Rust?", &options).await?;
159//!
160//!     while let Some(block) = stream.next().await {
161//!         if let open_agent::ContentBlock::Text(text) = block? {
162//!             print!("{}", text.text);
163//!         }
164//!     }
165//!
166//!     Ok(())
167//! }
168//! ```
169//!
170//! ## Multi-Turn Conversation
171//!
172//! ```rust,no_run
173//! use open_agent::{Client, AgentOptions, ContentBlock};
174//! use futures::StreamExt;
175//!
176//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
177//! let mut client = Client::new(AgentOptions::builder()
178//!     .model("gpt-4")
179//!     .api_key("sk-...")
180//!     .build()?)?;
181//!
182//! // First question
183//! client.send("What's the capital of France?").await?;
184//! while let Some(block) = client.receive().await? {
185//!     if let ContentBlock::Text(text) = block {
186//!         println!("{}", text.text);
187//!     }
188//! }
189//!
190//! // Follow-up question (history is maintained)
191//! client.send("What's its population?").await?;
192//! while let Some(block) = client.receive().await? {
193//!     if let ContentBlock::Text(text) = block {
194//!         println!("{}", text.text);
195//!     }
196//! }
197//! # Ok(())
198//! # }
199//! ```
200//!
201//! ## Manual Tool Execution
202//!
203//! ```rust,no_run
204//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
205//! use serde_json::json;
206//!
207//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208//! let calculator = Tool::new(
209//!     "calculator",
210//!     "Performs arithmetic operations",
211//!     json!({"type": "object", "properties": {"operation": {"type": "string"}}}),
212//!     |input| Box::pin(async move {
213//!         // Custom execution logic
214//!         Ok(json!({"result": 42}))
215//!     })
216//! );
217//!
218//! let mut client = Client::new(AgentOptions::builder()
219//!     .model("gpt-4")
220//!     .api_key("sk-...")
221//!     .tools(vec![calculator])
222//!     .build()?)?;
223//!
224//! client.send("Calculate 2+2").await?;
225//!
226//! while let Some(block) = client.receive().await? {
227//!     match block {
228//!         ContentBlock::ToolUse(tool_use) => {
229//!             println!("Model wants to use: {}", tool_use.name);
230//!
231//!             // Execute tool manually
232//!             let tool = client.get_tool(&tool_use.name).unwrap();
233//!             let result = tool.execute(tool_use.input).await?;
234//!
235//!             // Add result and continue
236//!             client.add_tool_result(&tool_use.id, result)?;
237//!             client.send("").await?;
238//!         }
239//!         ContentBlock::Text(text) => {
240//!             println!("Response: {}", text.text);
241//!         }
242//!         _ => {}
243//!     }
244//! }
245//! # Ok(())
246//! # }
247//! ```
248//!
249//! ## Automatic Tool Execution
250//!
251//! ```rust,no_run
252//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
253//! use serde_json::json;
254//!
255//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256//! let calculator = Tool::new(
257//!     "calculator",
258//!     "Performs arithmetic operations",
259//!     json!({"type": "object"}),
260//!     |input| Box::pin(async move { Ok(json!({"result": 42})) })
261//! );
262//!
263//! let mut client = Client::new(AgentOptions::builder()
264//!     .model("gpt-4")
265//!     .api_key("sk-...")
266//!     .tools(vec![calculator])
267//!     .auto_execute_tools(true)  // Enable auto-execution
268//!     .max_tool_iterations(5)    // Max 5 tool rounds
269//!     .build()?)?;
270//!
271//! client.send("Calculate 2+2 and then multiply by 3").await?;
272//!
273//! // Tools are executed automatically - you only get final text response
274//! while let Some(block) = client.receive().await? {
275//!     if let ContentBlock::Text(text) = block {
276//!         println!("{}", text.text);
277//!     }
278//! }
279//! # Ok(())
280//! # }
281//! ```
282//!
283//! ## With Hooks
284//!
285//! ```ignore
286//! use open_agent::{Client, AgentOptions, Hooks, HookDecision};
287//!
288//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
289//! let hooks = Hooks::new()
290//!     .add_user_prompt_submit(|event| async move {
291//!         // Block prompts containing certain words
292//!         if event.prompt.contains("forbidden") {
293//!             return Some(HookDecision::block("Forbidden word detected"));
294//!         }
295//!         Some(HookDecision::continue_())
296//!     })
297//!     .add_pre_tool_use(|event| async move {
298//!         // Log all tool uses
299//!         println!("Executing tool: {}", event.tool_name);
300//!         Some(HookDecision::continue_())
301//!     });
302//!
303//! let mut client = Client::new(AgentOptions::builder()
304//!     .model("gpt-4")
305//!     .base_url("http://localhost:1234/v1")
306//!     .hooks(hooks)
307//!     .build()?)?;
308//!
309//! // Hooks will be executed automatically
310//! client.send("Hello!").await?;
311//! # Ok(())
312//! # }
313//! ```
314
315use crate::types::{
316    AgentOptions, ContentBlock, Message, MessageRole, OpenAIMessage, OpenAIRequest, TextBlock,
317};
318use crate::utils::{ToolCallAggregator, parse_sse_stream};
319use crate::{Error, Result};
320use futures::stream::{Stream, StreamExt};
321use std::pin::Pin;
322use std::sync::Arc;
323use std::sync::atomic::{AtomicBool, Ordering};
324use std::time::Duration;
325
326/// A pinned, boxed stream of content blocks from the model.
327///
328/// This type alias represents an asynchronous stream that yields `ContentBlock` items.
329/// Each item is wrapped in a `Result` to handle potential errors during streaming.
330///
331/// The stream is:
332/// - **Pinned** (`Pin<Box<...>>`): Required for safe async operations and self-referential types
333/// - **Boxed**: Allows dynamic dispatch and hides the concrete stream implementation
334/// - **Send**: Can be safely transferred between threads
335///
336/// # Content Blocks
337///
338/// The stream can yield several types of content blocks:
339///
340/// - **TextBlock**: Incremental text responses from the model
341/// - **ToolUseBlock**: Requests to execute a tool with specific parameters
342/// - **ToolResultBlock**: Results from tool execution (in manual mode)
343///
344/// # Error Handling
345///
346/// Errors in the stream indicate issues like:
347/// - Network failures or timeouts
348/// - Malformed SSE events
349/// - JSON parsing errors
350/// - API errors from the model provider
351///
352/// When an error occurs, the stream typically terminates. It's the caller's responsibility
353/// to handle errors appropriately.
354///
355/// # Examples
356///
357/// ```rust,no_run
358/// use open_agent::{query, AgentOptions, ContentBlock};
359/// use futures::StreamExt;
360///
361/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
362/// let options = AgentOptions::builder()
363///     .model("gpt-4")
364///     .api_key("sk-...")
365///     .build()?;
366///
367/// let mut stream = query("Hello!", &options).await?;
368///
369/// while let Some(result) = stream.next().await {
370///     match result {
371///         Ok(ContentBlock::Text(text)) => print!("{}", text.text),
372///         Ok(_) => {}, // Other block types
373///         Err(e) => eprintln!("Stream error: {}", e),
374///     }
375/// }
376/// # Ok(())
377/// # }
378/// ```
379pub type ContentStream = Pin<Box<dyn Stream<Item = Result<ContentBlock>> + Send>>;
380
381/// Simple query function for single-turn interactions without conversation history.
382///
383/// This is a stateless convenience function for simple queries that don't require
384/// multi-turn conversations. It creates a temporary HTTP client, sends a single
385/// prompt, and returns a stream of content blocks.
386///
387/// For multi-turn conversations or more control over the interaction, use [`Client`] instead.
388///
389/// # Parameters
390///
391/// - `prompt`: The user's message to send to the model
392/// - `options`: Configuration including model, API key, tools, etc.
393///
394/// # Returns
395///
396/// Returns a `ContentStream` that yields content blocks as they arrive from the model.
397/// The stream must be polled to completion to receive all blocks.
398///
399/// # Behavior
400///
401/// 1. Creates a temporary HTTP client with configured timeout
402/// 2. Builds message array (system prompt + user prompt)
403/// 3. Converts tools to OpenAI format if provided
404/// 4. Makes HTTP POST request to `/chat/completions`
405/// 5. Parses Server-Sent Events (SSE) response stream
406/// 6. Aggregates chunks into complete content blocks
407/// 7. Returns stream that yields blocks as they complete
408///
409/// # Error Handling
410///
411/// This function can return errors for:
412/// - HTTP client creation failures
413/// - Network errors during the request
414/// - API errors (authentication, invalid model, rate limits, etc.)
415/// - SSE parsing errors
416/// - JSON deserialization errors
417///
418/// # Performance Notes
419///
420/// - Creates a new HTTP client for each call (consider using `Client` for repeated queries)
421/// - Timeout is configurable via `AgentOptions::timeout` (default: 120 seconds)
422/// - Streaming begins immediately; no buffering of the full response
423///
424/// # Examples
425///
426/// ## Basic Usage
427///
428/// ```rust,no_run
429/// use open_agent::{query, AgentOptions};
430/// use futures::StreamExt;
431///
432/// #[tokio::main]
433/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
434///     let options = AgentOptions::builder()
435///         .system_prompt("You are a helpful assistant")
436///         .model("gpt-4")
437///         .api_key("sk-...")
438///         .build()?;
439///
440///     let mut stream = query("What's the capital of France?", &options).await?;
441///
442///     while let Some(block) = stream.next().await {
443///         match block? {
444///             open_agent::ContentBlock::Text(text) => {
445///                 print!("{}", text.text);
446///             }
447///             _ => {}
448///         }
449///     }
450///
451///     Ok(())
452/// }
453/// ```
454///
455/// ## With Tools
456///
457/// ```rust,no_run
458/// use open_agent::{query, AgentOptions, Tool, ContentBlock};
459/// use futures::StreamExt;
460/// use serde_json::json;
461///
462/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
463/// let calculator = Tool::new(
464///     "calculator",
465///     "Performs calculations",
466///     json!({"type": "object"}),
467///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
468/// );
469///
470/// let options = AgentOptions::builder()
471///     .model("gpt-4")
472///     .api_key("sk-...")
473///     .tools(vec![calculator])
474///     .build()?;
475///
476/// let mut stream = query("Calculate 2+2", &options).await?;
477///
478/// while let Some(block) = stream.next().await {
479///     match block? {
480///         ContentBlock::ToolUse(tool_use) => {
481///             println!("Model wants to use: {}", tool_use.name);
482///             // Note: You'll need to manually execute tools and continue
483///             // the conversation. For automatic execution, use Client.
484///         }
485///         ContentBlock::Text(text) => print!("{}", text.text),
486///         _ => {}
487///     }
488/// }
489/// # Ok(())
490/// # }
491/// ```
492///
493/// ## Error Handling
494///
495/// ```rust,no_run
496/// use open_agent::{query, AgentOptions};
497/// use futures::StreamExt;
498///
499/// # async fn example() {
500/// let options = AgentOptions::builder()
501///     .model("gpt-4")
502///     .api_key("invalid-key")
503///     .build()
504///     .unwrap();
505///
506/// match query("Hello", &options).await {
507///     Ok(mut stream) => {
508///         while let Some(result) = stream.next().await {
509///             match result {
510///                 Ok(block) => println!("Block: {:?}", block),
511///                 Err(e) => {
512///                     eprintln!("Stream error: {}", e);
513///                     break;
514///                 }
515///             }
516///         }
517///     }
518///     Err(e) => eprintln!("Query failed: {}", e),
519/// }
520/// # }
521/// ```
522pub async fn query(prompt: &str, options: &AgentOptions) -> Result<ContentStream> {
523    // Create HTTP client with configured timeout
524    // The timeout applies to the entire request, not individual chunks
525    let client = reqwest::Client::builder()
526        .timeout(Duration::from_secs(options.timeout()))
527        .build()
528        .map_err(Error::Http)?;
529
530    // Build messages array for the API request
531    // OpenAI format expects an array of message objects with role and content
532    let mut messages = Vec::new();
533
534    // Add system prompt if provided
535    // System prompts set the assistant's behavior and context
536    if !options.system_prompt().is_empty() {
537        messages.push(OpenAIMessage {
538            role: "system".to_string(),
539            content: options.system_prompt().to_string(),
540            tool_calls: None,
541            tool_call_id: None,
542        });
543    }
544
545    // Add user prompt
546    // This is the actual query from the user
547    messages.push(OpenAIMessage {
548        role: "user".to_string(),
549        content: prompt.to_string(),
550        tool_calls: None,
551        tool_call_id: None,
552    });
553
554    // Convert tools to OpenAI format if any are provided
555    // Tools are described using JSON Schema for parameter validation
556    let tools = if !options.tools().is_empty() {
557        Some(
558            options
559                .tools()
560                .iter()
561                .map(|t| t.to_openai_format())
562                .collect(),
563        )
564    } else {
565        None
566    };
567
568    // Build the OpenAI-compatible request payload
569    // stream=true enables Server-Sent Events for incremental responses
570    let request = OpenAIRequest {
571        model: options.model().to_string(),
572        messages,
573        stream: true, // Critical: enables SSE streaming
574        max_tokens: options.max_tokens(),
575        temperature: Some(options.temperature()),
576        tools,
577    };
578
579    // Make HTTP POST request to the chat completions endpoint
580    let url = format!("{}/chat/completions", options.base_url());
581    let response = client
582        .post(&url)
583        .header("Authorization", format!("Bearer {}", options.api_key()))
584        .header("Content-Type", "application/json")
585        .json(&request)
586        .send()
587        .await
588        .map_err(Error::Http)?;
589
590    // Check for HTTP-level errors before processing the stream
591    // This catches authentication failures, rate limits, invalid models, etc.
592    if !response.status().is_success() {
593        let status = response.status();
594        let body = response.text().await.unwrap_or_else(|e| {
595            eprintln!("WARNING: Failed to read error response body: {}", e);
596            "Unknown error (failed to read response body)".to_string()
597        });
598        return Err(Error::api(format!("API error {}: {}", status, body)));
599    }
600
601    // Parse the Server-Sent Events (SSE) stream
602    // The response body is a stream of "data: {...}" events
603    let sse_stream = parse_sse_stream(response);
604
605    // Aggregate SSE chunks into complete content blocks
606    // ToolCallAggregator handles partial JSON and assembles complete tool calls
607    // The scan() combinator maintains state across stream items
608    let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
609        let result = match chunk_result {
610            Ok(chunk) => match aggregator.process_chunk(chunk) {
611                Ok(blocks) => {
612                    if blocks.is_empty() {
613                        Some(None) // Intermediate chunk, continue streaming
614                    } else {
615                        Some(Some(Ok(blocks))) // Complete block(s) ready
616                    }
617                }
618                Err(e) => Some(Some(Err(e))), // Propagate processing error
619            },
620            Err(e) => Some(Some(Err(e))), // Propagate stream error
621        };
622        futures::future::ready(result)
623    });
624
625    // Flatten the stream to emit individual blocks
626    // filter_map removes None values (incomplete chunks)
627    // flat_map expands Vec<ContentBlock> into individual items
628    let flattened = stream
629        .filter_map(|item| async move { item })
630        .flat_map(|result| {
631            futures::stream::iter(match result {
632                Ok(blocks) => blocks.into_iter().map(Ok).collect(),
633                Err(e) => vec![Err(e)],
634            })
635        });
636
637    // Pin and box the stream for type erasure and safe async usage
638    Ok(Box::pin(flattened))
639}
640
641/// Stateful client for multi-turn conversations with automatic history management.
642///
643/// The `Client` is the primary interface for building conversational AI applications.
644/// It maintains conversation history, manages streaming responses, and provides two
645/// modes of operation: manual and automatic tool execution.
646///
647/// # State Management
648///
649/// The client maintains several pieces of state that persist across multiple turns:
650///
651/// - **Conversation History**: Complete record of all messages exchanged
652/// - **Active Stream**: Currently active SSE stream being consumed
653/// - **Interrupt Flag**: Thread-safe cancellation signal
654/// - **Auto-Execution Buffer**: Cached blocks for auto-execution mode
655///
656/// # Operating Modes
657///
658/// ## Manual Mode (default)
659///
660/// In manual mode, the client streams blocks directly to the caller. When the model
661/// requests a tool, you receive a `ToolUseBlock`, execute the tool yourself, add the
662/// result with `add_tool_result()`, and continue the conversation.
663///
664/// **Advantages**:
665/// - Full control over tool execution
666/// - Custom error handling per tool
667/// - Ability to modify tool inputs/outputs
668/// - Interactive debugging capabilities
669///
670/// ## Automatic Mode (`auto_execute_tools = true`)
671///
672/// In automatic mode, the client executes tools transparently and only returns the
673/// final text response after all tool iterations complete.
674///
675/// **Advantages**:
676/// - Simpler API for common use cases
677/// - Built-in retry logic via hooks
678/// - Automatic conversation continuation
679/// - Configurable iteration limits
680///
681/// # Thread Safety
682///
683/// The client is NOT thread-safe for concurrent use. However, the interrupt mechanism
684/// uses `Arc<AtomicBool>` which can be safely shared across threads to signal cancellation.
685///
686/// # Memory Management
687///
688/// - History grows unbounded by default (consider clearing periodically)
689/// - Streams are consumed lazily (low memory footprint during streaming)
690/// - Auto-execution buffers entire response (higher memory in auto mode)
691///
692/// # Examples
693///
694/// ## Basic Multi-Turn Conversation
695///
696/// ```rust,no_run
697/// use open_agent::{Client, AgentOptions, ContentBlock};
698///
699/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
700/// let mut client = Client::new(AgentOptions::builder()
701///     .model("gpt-4")
702///     .api_key("sk-...")
703///     .build()?)?;
704///
705/// // First question
706/// client.send("What's the capital of France?").await?;
707/// while let Some(block) = client.receive().await? {
708///     if let ContentBlock::Text(text) = block {
709///         println!("{}", text.text); // "Paris is the capital of France."
710///     }
711/// }
712///
713/// // Follow-up question - history is automatically maintained
714/// client.send("What's its population?").await?;
715/// while let Some(block) = client.receive().await? {
716///     if let ContentBlock::Text(text) = block {
717///         println!("{}", text.text); // "Paris has approximately 2.2 million people."
718///     }
719/// }
720/// # Ok(())
721/// # }
722/// ```
723///
724/// ## Manual Tool Execution
725///
726/// ```rust,no_run
727/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
728/// use serde_json::json;
729///
730/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
731/// let calculator = Tool::new(
732///     "calculator",
733///     "Performs arithmetic",
734///     json!({"type": "object"}),
735///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
736/// );
737///
738/// let mut client = Client::new(AgentOptions::builder()
739///     .model("gpt-4")
740///     .api_key("sk-...")
741///     .tools(vec![calculator])
742///     .build()?)?;
743///
744/// client.send("What's 2+2?").await?;
745///
746/// while let Some(block) = client.receive().await? {
747///     match block {
748///         ContentBlock::ToolUse(tool_use) => {
749///             // Execute tool manually
750///             let result = json!({"result": 4});
751///             client.add_tool_result(&tool_use.id, result)?;
752///
753///             // Continue conversation to get model's response
754///             client.send("").await?;
755///         }
756///         ContentBlock::Text(text) => {
757///             println!("{}", text.text); // "The result is 4."
758///         }
759///         _ => {}
760///     }
761/// }
762/// # Ok(())
763/// # }
764/// ```
765///
766/// ## Automatic Tool Execution
767///
768/// ```rust,no_run
769/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
770/// use serde_json::json;
771///
772/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
773/// let calculator = Tool::new(
774///     "calculator",
775///     "Performs arithmetic",
776///     json!({"type": "object"}),
777///     |input| Box::pin(async move { Ok(json!({"result": 42})) })
778/// );
779///
780/// let mut client = Client::new(AgentOptions::builder()
781///     .model("gpt-4")
782///     .api_key("sk-...")
783///     .tools(vec![calculator])
784///     .auto_execute_tools(true)  // Enable auto-execution
785///     .build()?)?;
786///
787/// client.send("What's 2+2?").await?;
788///
789/// // Tools execute automatically - you only receive final text
790/// while let Some(block) = client.receive().await? {
791///     if let ContentBlock::Text(text) = block {
792///         println!("{}", text.text); // "The result is 4."
793///     }
794/// }
795/// # Ok(())
796/// # }
797/// ```
798///
799/// ## With Interruption
800///
801/// ```rust,no_run
802/// use open_agent::{Client, AgentOptions};
803/// use std::time::Duration;
804///
805/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
806/// let mut client = Client::new(AgentOptions::default())?;
807///
808/// // Start a long-running query
809/// client.send("Write a very long story").await?;
810///
811/// // Spawn a task to interrupt after timeout
812/// let interrupt_handle = client.interrupt_handle();
813/// tokio::spawn(async move {
814///     tokio::time::sleep(Duration::from_secs(5)).await;
815///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
816/// });
817///
818/// // This loop will stop when interrupted
819/// while let Some(block) = client.receive().await? {
820///     // Process blocks...
821/// }
822///
823/// // Client is still usable after interruption
824/// client.send("What's 2+2?").await?;
825/// # Ok(())
826/// # }
827/// ```
828pub struct Client {
829    /// Configuration options including model, API key, tools, hooks, etc.
830    ///
831    /// This field contains all the settings that control how the client behaves.
832    /// It's set once during construction and cannot be modified (though you can
833    /// access it via `options()` for inspection).
834    options: AgentOptions,
835
836    /// Complete conversation history as a sequence of messages.
837    ///
838    /// Each message contains a role (System/User/Assistant/Tool) and content blocks.
839    /// History grows unbounded by default - use `clear_history()` to reset.
840    ///
841    /// **Important**: The history includes ALL messages, not just user/assistant.
842    /// This includes tool results and intermediate assistant messages from tool calls.
843    history: Vec<Message>,
844
845    /// Currently active SSE stream being consumed.
846    ///
847    /// This is `Some(stream)` while a response is being received, and `None` when
848    /// no request is in flight or after a response completes.
849    ///
850    /// The stream is set by `send()` and consumed by `receive()`. When the stream
851    /// is exhausted, `receive()` returns `Ok(None)` and sets this back to `None`.
852    current_stream: Option<ContentStream>,
853
854    /// Reusable HTTP client for making API requests.
855    ///
856    /// Configured once during construction with the timeout from `AgentOptions`.
857    /// Reusing the same client across requests enables connection pooling and
858    /// better performance for multi-turn conversations.
859    http_client: reqwest::Client,
860
861    /// Thread-safe interrupt flag for cancellation.
862    ///
863    /// This `Arc<AtomicBool>` can be cloned and shared across threads or async tasks
864    /// to signal cancellation. When set to `true`, the next `receive()` call will
865    /// return `Ok(None)` and clear the current stream.
866    ///
867    /// The flag is automatically reset to `false` at the start of each `send()` call.
868    ///
869    /// **Thread Safety**: Can be safely accessed from multiple threads using atomic
870    /// operations. However, only one thread should call `send()`/`receive()`.
871    interrupted: Arc<AtomicBool>,
872
873    /// Buffer of content blocks for auto-execution mode.
874    ///
875    /// When `auto_execute_tools` is enabled, `receive()` internally calls the
876    /// auto-execution loop which buffers all final text blocks here. Subsequent
877    /// calls to `receive()` return blocks from this buffer one at a time.
878    ///
879    /// **Only used when `options.auto_execute_tools == true`**.
880    ///
881    /// The buffer is cleared when starting a new auto-execution loop.
882    auto_exec_buffer: Vec<ContentBlock>,
883
884    /// Current read position in the auto-execution buffer.
885    ///
886    /// Tracks which block to return next when `receive()` is called in auto mode.
887    /// Reset to 0 when the buffer is refilled with a new response.
888    ///
889    /// **Only used when `options.auto_execute_tools == true`**.
890    auto_exec_index: usize,
891}
892
893impl Client {
894    /// Creates a new client with the specified configuration.
895    ///
896    /// This constructor initializes all state fields and creates a reusable HTTP client
897    /// configured with the timeout from `AgentOptions`.
898    ///
899    /// # Parameters
900    ///
901    /// - `options`: Configuration including model, API key, tools, hooks, etc.
902    ///
903    /// # Errors
904    ///
905    /// Returns an error if the HTTP client cannot be built. This can happen due to:
906    /// - Invalid TLS configuration
907    /// - System resource exhaustion
908    /// - Invalid timeout values
909    ///
910    /// # Examples
911    ///
912    /// ```rust
913    /// use open_agent::{Client, AgentOptions};
914    ///
915    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
916    /// let client = Client::new(AgentOptions::builder()
917    ///     .model("gpt-4")
918    ///     .base_url("http://localhost:1234/v1")
919    ///     .build()?)?;
920    /// # Ok(())
921    /// # }
922    /// ```
923    pub fn new(options: AgentOptions) -> Result<Self> {
924        // Build HTTP client with configured timeout
925        // This client is reused across all requests for connection pooling
926        let http_client = reqwest::Client::builder()
927            .timeout(Duration::from_secs(options.timeout()))
928            .build()
929            .map_err(|e| Error::config(format!("Failed to build HTTP client: {}", e)))?;
930
931        Ok(Self {
932            options,
933            history: Vec::new(),  // Empty conversation history
934            current_stream: None, // No active stream yet
935            http_client,
936            interrupted: Arc::new(AtomicBool::new(false)), // Not interrupted initially
937            auto_exec_buffer: Vec::new(),                  // Empty buffer for auto mode
938            auto_exec_index: 0,                            // Start at beginning of buffer
939        })
940    }
941
942    /// Sends a user message and initiates streaming of the model's response.
943    ///
944    /// This method performs several critical steps:
945    ///
946    /// 1. Executes UserPromptSubmit hooks (which can modify or block the prompt)
947    /// 2. Adds the user message to conversation history
948    /// 3. Builds and sends HTTP request to the OpenAI-compatible API
949    /// 4. Parses the SSE stream and sets up aggregation
950    /// 5. Stores the stream for consumption via `receive()`
951    ///
952    /// # Parameters
953    ///
954    /// - `prompt`: The user's message. Can be empty to continue conversation after
955    ///   adding tool results (common pattern in manual tool execution mode).
956    ///
957    /// # Returns
958    ///
959    /// - `Ok(())`: Request sent successfully, call `receive()` to get blocks
960    /// - `Err(e)`: Request failed (network error, API error, hook blocked, etc.)
961    ///
962    /// # Behavior Details
963    ///
964    /// ## Hook Execution
965    ///
966    /// Before sending, UserPromptSubmit hooks are executed. Hooks can:
967    /// - Modify the prompt text
968    /// - Block the request entirely
969    /// - Access conversation history
970    ///
971    /// If a hook blocks the request, this method returns an error immediately.
972    ///
973    /// ## History Management
974    ///
975    /// The prompt is added to history BEFORE sending the request. This ensures
976    /// that history is consistent even if the request fails.
977    ///
978    /// ## Stream Setup
979    ///
980    /// The response stream is set up but not consumed. You must call `receive()`
981    /// repeatedly to get content blocks. The stream remains active until:
982    /// - All blocks are consumed (stream naturally ends)
983    /// - An error occurs
984    /// - Interrupt is triggered
985    ///
986    /// ## Interrupt Handling
987    ///
988    /// The interrupt flag is reset to `false` at the start of this method,
989    /// allowing a fresh request after a previous interruption.
990    ///
991    /// # State Changes
992    ///
993    /// - Resets `interrupted` flag to `false`
994    /// - Appends user message to `history`
995    /// - Sets `current_stream` to new SSE stream
996    /// - Does NOT modify `auto_exec_buffer` or `auto_exec_index`
997    ///
998    /// # Errors
999    ///
1000    /// Returns errors for:
1001    /// - Hook blocking the prompt
1002    /// - HTTP client errors (network failure, DNS, etc.)
1003    /// - API errors (auth failure, invalid model, rate limits)
1004    /// - Invalid response format
1005    ///
1006    /// After an error, the client remains usable for new requests.
1007    ///
1008    /// # Examples
1009    ///
1010    /// ## Basic Usage
1011    ///
1012    /// ```rust,no_run
1013    /// # use open_agent::{Client, AgentOptions};
1014    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1015    /// # let mut client = Client::new(AgentOptions::default())?;
1016    /// client.send("Hello!").await?;
1017    ///
1018    /// while let Some(block) = client.receive().await? {
1019    ///     // Process blocks...
1020    /// }
1021    /// # Ok(())
1022    /// # }
1023    /// ```
1024    ///
1025    /// ## Continuing After Tool Result
1026    ///
1027    /// ```rust,no_run
1028    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1029    /// # use serde_json::json;
1030    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1031    /// # let mut client = Client::new(AgentOptions::default())?;
1032    /// client.send("Use the calculator").await?;
1033    ///
1034    /// while let Some(block) = client.receive().await? {
1035    ///     if let ContentBlock::ToolUse(tool_use) = block {
1036    ///         // Execute tool and add result
1037    ///         client.add_tool_result(&tool_use.id, json!({"result": 42}))?;
1038    ///
1039    ///         // Continue conversation with empty prompt
1040    ///         client.send("").await?;
1041    ///     }
1042    /// }
1043    /// # Ok(())
1044    /// # }
1045    /// ```
1046    pub async fn send(&mut self, prompt: &str) -> Result<()> {
1047        use crate::hooks::UserPromptSubmitEvent;
1048
1049        // Reset interrupt flag for new query
1050        // This allows the client to be reused after a previous interruption
1051        // Uses SeqCst ordering to ensure visibility across all threads
1052        self.interrupted.store(false, Ordering::SeqCst);
1053
1054        // Execute UserPromptSubmit hooks
1055        // Hooks run BEFORE adding to history, allowing modification or blocking
1056        let mut final_prompt = prompt.to_string();
1057        let history_snapshot: Vec<serde_json::Value> = self
1058            .history
1059            .iter()
1060            .map(|_| serde_json::json!({})) // Simplified snapshot for hooks
1061            .collect();
1062
1063        // Create hook event with current prompt and history
1064        let event = UserPromptSubmitEvent::new(final_prompt.clone(), history_snapshot);
1065
1066        // Execute all registered UserPromptSubmit hooks
1067        if let Some(decision) = self.options.hooks().execute_user_prompt_submit(event).await {
1068            // Check if hook wants to block execution
1069            if !decision.continue_execution() {
1070                return Err(Error::other(format!(
1071                    "Prompt blocked by hook: {}",
1072                    decision.reason().unwrap_or("")
1073                )));
1074            }
1075            // Apply any prompt modifications from hooks
1076            if let Some(modified) = decision.modified_prompt() {
1077                final_prompt = modified.to_string();
1078            }
1079        }
1080
1081        // Add user message to history BEFORE sending request
1082        // This ensures history consistency even if request fails
1083        // Empty prompts are still added (needed for tool continuation)
1084        self.history.push(Message::user(final_prompt));
1085
1086        // Build messages array for API request
1087        // This includes system prompt + full conversation history
1088        let mut messages = Vec::new();
1089
1090        // Add system prompt as first message if configured
1091        // System prompts are added fresh for each request (not from history)
1092        if !self.options.system_prompt().is_empty() {
1093            messages.push(OpenAIMessage {
1094                role: "system".to_string(),
1095                content: self.options.system_prompt().to_string(),
1096                tool_calls: None,
1097                tool_call_id: None,
1098            });
1099        }
1100
1101        // Convert conversation history to OpenAI message format
1102        // This includes user prompts, assistant responses, and tool results
1103        for msg in &self.history {
1104            // Extract text content from all content blocks
1105            //
1106            // KNOWN LIMITATION: ToolUse and ToolResult blocks are not serialized to OpenAI format.
1107            // This works for the current streaming use case where tool calls are handled via
1108            // ContentBlock::ToolUse/ToolResult in the response stream, but means conversation
1109            // history cannot be fully round-tripped through OpenAI format. To implement proper
1110            // serialization:
1111            // 1. When processing ContentBlock::ToolUse, populate tool_calls array
1112            // 2. When processing ContentBlock::ToolResult, populate tool_call_id field
1113            // 3. Handle the different message structures (assistant with tool_calls vs tool with tool_call_id)
1114            let content = msg
1115                .content
1116                .iter()
1117                .filter_map(|block| match block {
1118                    ContentBlock::Text(text) => Some(text.text.clone()),
1119                    _ => None, // Skip ToolUse and ToolResult blocks (see limitation above)
1120                })
1121                .collect::<Vec<_>>()
1122                .join("\n");
1123
1124            // Convert our MessageRole enum to OpenAI role string
1125            messages.push(OpenAIMessage {
1126                role: match msg.role {
1127                    MessageRole::System => "system",
1128                    MessageRole::User => "user",
1129                    MessageRole::Assistant => "assistant",
1130                    MessageRole::Tool => "tool",
1131                }
1132                .to_string(),
1133                content,
1134                tool_calls: None, // LIMITATION: Not populated from ToolUse blocks (see above)
1135                tool_call_id: None, // LIMITATION: Not populated from ToolResult blocks (see above)
1136            });
1137        }
1138
1139        // Convert tools to OpenAI format if any are registered
1140        // Each tool is described with name, description, and JSON Schema parameters
1141        let tools = if !self.options.tools().is_empty() {
1142            Some(
1143                self.options
1144                    .tools()
1145                    .iter()
1146                    .map(|t| t.to_openai_format())
1147                    .collect(),
1148            )
1149        } else {
1150            None
1151        };
1152
1153        // Build the OpenAI-compatible request payload
1154        let request = OpenAIRequest {
1155            model: self.options.model().to_string(),
1156            messages,
1157            stream: true, // Always stream for progressive rendering
1158            max_tokens: self.options.max_tokens(),
1159            temperature: Some(self.options.temperature()),
1160            tools,
1161        };
1162
1163        // Make HTTP POST request to chat completions endpoint
1164        let url = format!("{}/chat/completions", self.options.base_url());
1165        let response = self
1166            .http_client
1167            .post(&url)
1168            .header(
1169                "Authorization",
1170                format!("Bearer {}", self.options.api_key()),
1171            )
1172            .header("Content-Type", "application/json")
1173            .json(&request)
1174            .send()
1175            .await
1176            .map_err(Error::Http)?;
1177
1178        // Check for HTTP-level errors before processing stream
1179        // This catches authentication, rate limits, invalid models, etc.
1180        if !response.status().is_success() {
1181            let status = response.status();
1182            let body = response.text().await.unwrap_or_else(|e| {
1183                eprintln!("WARNING: Failed to read error response body: {}", e);
1184                "Unknown error (failed to read response body)".to_string()
1185            });
1186            return Err(Error::api(format!("API error {}: {}", status, body)));
1187        }
1188
1189        // Parse Server-Sent Events (SSE) stream from response
1190        let sse_stream = parse_sse_stream(response);
1191
1192        // Aggregate SSE chunks into complete content blocks
1193        // ToolCallAggregator maintains state to handle incremental JSON chunks
1194        // that may arrive split across multiple SSE events
1195        let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
1196            let result = match chunk_result {
1197                Ok(chunk) => match aggregator.process_chunk(chunk) {
1198                    Ok(blocks) => {
1199                        if blocks.is_empty() {
1200                            Some(None) // Partial chunk, keep aggregating
1201                        } else {
1202                            Some(Some(Ok(blocks))) // Complete block(s) ready
1203                        }
1204                    }
1205                    Err(e) => Some(Some(Err(e))), // Processing error
1206                },
1207                Err(e) => Some(Some(Err(e))), // Stream error
1208            };
1209            futures::future::ready(result)
1210        });
1211
1212        // Flatten the stream to emit individual blocks
1213        // filter_map removes None values (partial chunks)
1214        // flat_map converts Vec<ContentBlock> to individual items
1215        let flattened = stream
1216            .filter_map(|item| async move { item })
1217            .flat_map(|result| {
1218                futures::stream::iter(match result {
1219                    Ok(blocks) => blocks.into_iter().map(Ok).collect(),
1220                    Err(e) => vec![Err(e)],
1221                })
1222            });
1223
1224        // Store the stream for consumption via receive()
1225        // The stream is NOT consumed here - that happens in receive()
1226        self.current_stream = Some(Box::pin(flattened));
1227
1228        Ok(())
1229    }
1230
1231    /// Internal method that returns one block from the current stream.
1232    ///
1233    /// This is the core streaming logic extracted for reuse by both manual mode
1234    /// and auto-execution mode. It handles interrupt checking and stream consumption.
1235    ///
1236    /// # Returns
1237    ///
1238    /// - `Ok(Some(block))`: Successfully received a content block
1239    /// - `Ok(None)`: Stream ended naturally or was interrupted
1240    /// - `Err(e)`: An error occurred during streaming
1241    ///
1242    /// # State Changes
1243    ///
1244    /// - Sets `current_stream` to `None` if interrupted or stream ends
1245    /// - Does not modify history or other state
1246    ///
1247    /// # Implementation Notes
1248    ///
1249    /// This method checks the interrupt flag on every call, allowing responsive
1250    /// cancellation. The check uses SeqCst ordering for immediate visibility of
1251    /// interrupts from other threads.
1252    async fn receive_one(&mut self) -> Result<Option<ContentBlock>> {
1253        // Check interrupt flag before attempting to receive
1254        // Uses SeqCst to ensure we see the latest value from any thread
1255        if self.interrupted.load(Ordering::SeqCst) {
1256            // Clear the stream and return None to signal completion
1257            self.current_stream = None;
1258            return Ok(None);
1259        }
1260
1261        // Poll the current stream if one exists
1262        if let Some(stream) = &mut self.current_stream {
1263            match stream.next().await {
1264                Some(Ok(block)) => Ok(Some(block)), // Got a block
1265                Some(Err(e)) => Err(e),             // Stream error
1266                None => Ok(None),                   // Stream ended
1267            }
1268        } else {
1269            // No active stream
1270            Ok(None)
1271        }
1272    }
1273
1274    /// Collects all blocks from the current stream into a vector.
1275    ///
1276    /// Internal helper for auto-execution mode. This method buffers the entire
1277    /// response in memory, which is necessary to determine if the response contains
1278    /// tool calls before returning anything to the caller.
1279    ///
1280    /// # Returns
1281    ///
1282    /// - `Ok(vec)`: Successfully collected all blocks
1283    /// - `Err(e)`: Error during collection or interrupted
1284    ///
1285    /// # Memory Usage
1286    ///
1287    /// This buffers the entire response, which can be large for long completions.
1288    /// Consider the memory implications when using auto-execution mode.
1289    ///
1290    /// # Interruption
1291    ///
1292    /// Checks interrupt flag during collection and returns error if interrupted.
1293    async fn collect_all_blocks(&mut self) -> Result<Vec<ContentBlock>> {
1294        let mut blocks = Vec::new();
1295
1296        // Consume entire stream into vector
1297        while let Some(block) = self.receive_one().await? {
1298            // Check interrupt during collection for responsiveness
1299            if self.interrupted.load(Ordering::SeqCst) {
1300                self.current_stream = None;
1301                return Err(Error::other(
1302                    "Operation interrupted during block collection",
1303                ));
1304            }
1305
1306            blocks.push(block);
1307        }
1308
1309        Ok(blocks)
1310    }
1311
1312    /// Executes a tool by name with the given input.
1313    ///
1314    /// Internal helper for auto-execution mode. Looks up the tool in the registered
1315    /// tools list and executes it with the provided input.
1316    ///
1317    /// # Parameters
1318    ///
1319    /// - `tool_name`: Name of the tool to execute
1320    /// - `input`: JSON value containing tool parameters
1321    ///
1322    /// # Returns
1323    ///
1324    /// - `Ok(result)`: Tool executed successfully, returns result as JSON
1325    /// - `Err(e)`: Tool not found or execution failed
1326    ///
1327    /// # Error Handling
1328    ///
1329    /// If the tool is not found in the registry, returns a ToolError.
1330    /// If execution fails, the error from the tool is propagated.
1331    async fn execute_tool_internal(
1332        &self,
1333        tool_name: &str,
1334        input: serde_json::Value,
1335    ) -> Result<serde_json::Value> {
1336        // Find tool in registered tools by name
1337        let tool = self
1338            .options
1339            .tools()
1340            .iter()
1341            .find(|t| t.name() == tool_name)
1342            .ok_or_else(|| Error::tool(format!("Tool '{}' not found", tool_name)))?;
1343
1344        // Execute the tool's async function
1345        tool.execute(input).await
1346    }
1347
1348    /// Auto-execution loop that handles tool calls automatically.
1349    ///
1350    /// This is the core implementation of automatic tool execution mode. It:
1351    ///
1352    /// 1. Collects all blocks from the current stream
1353    /// 2. Separates text blocks from tool use blocks
1354    /// 3. If there are tool blocks:
1355    ///    - Executes PreToolUse hooks (can modify/block)
1356    ///    - Executes each tool via its registered function
1357    ///    - Executes PostToolUse hooks (can modify result)
1358    ///    - Adds results to history
1359    ///    - Continues conversation with send("")
1360    /// 4. Repeats until text-only response or max iterations
1361    /// 5. Returns all final text blocks
1362    ///
1363    /// # Returns
1364    ///
1365    /// - `Ok(blocks)`: Final text blocks after all tool iterations
1366    /// - `Err(e)`: Error during execution, stream processing, or interruption
1367    ///
1368    /// # Iteration Limit
1369    ///
1370    /// The loop is bounded by `options.max_tool_iterations` to prevent infinite loops.
1371    /// When the limit is reached, the loop stops and returns whatever text blocks
1372    /// have been collected so far.
1373    ///
1374    /// # Hook Integration
1375    ///
1376    /// Hooks are executed for each tool call:
1377    /// - **PreToolUse**: Can modify input or block execution entirely
1378    /// - **PostToolUse**: Can modify the result before it's added to history
1379    ///
1380    /// If a hook blocks execution, a JSON error response is used as the tool result.
1381    ///
1382    /// # State Management
1383    ///
1384    /// The loop maintains history by adding:
1385    /// - Assistant messages with text + tool use blocks
1386    /// - User messages with tool result blocks
1387    ///
1388    /// This creates a proper conversation flow that the model can follow.
1389    ///
1390    /// # Error Recovery
1391    ///
1392    /// If a tool execution fails, the error is converted to a JSON error response
1393    /// and added as the tool result. This allows the conversation to continue
1394    /// and lets the model handle the error.
1395    async fn auto_execute_loop(&mut self) -> Result<Vec<ContentBlock>> {
1396        use crate::types::ToolResultBlock;
1397
1398        // Track iterations to prevent infinite loops
1399        let mut iteration = 0;
1400        let max_iterations = self.options.max_tool_iterations();
1401
1402        loop {
1403            // ========================================================================
1404            // STEP 1: Collect all blocks from current stream
1405            // ========================================================================
1406            // Buffer the entire response to determine if it contains tool calls
1407            let blocks = self.collect_all_blocks().await?;
1408
1409            // Empty response means stream ended or was interrupted
1410            if blocks.is_empty() {
1411                return Ok(Vec::new());
1412            }
1413
1414            // ========================================================================
1415            // STEP 2: Separate text blocks from tool use blocks
1416            // ========================================================================
1417            // The model can return a mix of text and tool calls in one response
1418            let mut text_blocks = Vec::new();
1419            let mut tool_blocks = Vec::new();
1420
1421            for block in blocks {
1422                match block {
1423                    ContentBlock::Text(_) => text_blocks.push(block),
1424                    ContentBlock::ToolUse(_) => tool_blocks.push(block),
1425                    _ => {} // Ignore ToolResult and other variants
1426                }
1427            }
1428
1429            // ========================================================================
1430            // STEP 3: Check if we're done (no tool calls)
1431            // ========================================================================
1432            // If the response contains no tool calls, we've reached the final answer
1433            if tool_blocks.is_empty() {
1434                // Add assistant's final text response to history
1435                if !text_blocks.is_empty() {
1436                    let assistant_msg = Message::assistant(text_blocks.clone());
1437                    self.history.push(assistant_msg);
1438                }
1439                // Return text blocks to caller via buffered receive()
1440                return Ok(text_blocks);
1441            }
1442
1443            // ========================================================================
1444            // STEP 4: Check iteration limit BEFORE executing tools
1445            // ========================================================================
1446            // Increment counter and check if we've hit the max
1447            iteration += 1;
1448            if iteration > max_iterations {
1449                // Max iterations reached - stop execution and return what we have
1450                // This prevents infinite tool-calling loops
1451                if !text_blocks.is_empty() {
1452                    let assistant_msg = Message::assistant(text_blocks.clone());
1453                    self.history.push(assistant_msg);
1454                }
1455                return Ok(text_blocks);
1456            }
1457
1458            // ========================================================================
1459            // STEP 5: Add assistant message to history
1460            // ========================================================================
1461            // The assistant message includes BOTH text and tool use blocks
1462            // This preserves the full context for future turns
1463            let mut all_blocks = text_blocks.clone();
1464            all_blocks.extend(tool_blocks.clone());
1465            let assistant_msg = Message::assistant(all_blocks);
1466            self.history.push(assistant_msg);
1467
1468            // ========================================================================
1469            // STEP 6: Execute all tools and collect results
1470            // ========================================================================
1471            for block in tool_blocks {
1472                if let ContentBlock::ToolUse(tool_use) = block {
1473                    // Create simplified history snapshot for hooks
1474                    // TODO: Full serialization of history for hooks
1475                    let history_snapshot: Vec<serde_json::Value> =
1476                        self.history.iter().map(|_| serde_json::json!({})).collect();
1477
1478                    // ============================================================
1479                    // Execute PreToolUse hooks
1480                    // ============================================================
1481                    use crate::hooks::PreToolUseEvent;
1482                    let pre_event = PreToolUseEvent::new(
1483                        tool_use.name.clone(),
1484                        tool_use.input.clone(),
1485                        tool_use.id.clone(),
1486                        history_snapshot.clone(),
1487                    );
1488
1489                    // Track whether to execute and what input to use
1490                    let mut tool_input = tool_use.input.clone();
1491                    let mut should_execute = true;
1492                    let mut block_reason = None;
1493
1494                    // Execute all PreToolUse hooks
1495                    if let Some(decision) =
1496                        self.options.hooks().execute_pre_tool_use(pre_event).await
1497                    {
1498                        if !decision.continue_execution() {
1499                            // Hook blocked execution
1500                            should_execute = false;
1501                            block_reason = decision.reason().map(|s| s.to_string());
1502                        } else if let Some(modified) = decision.modified_input() {
1503                            // Hook modified the input
1504                            tool_input = modified.clone();
1505                        }
1506                    }
1507
1508                    // ============================================================
1509                    // Execute tool (or create error result if blocked)
1510                    // ============================================================
1511                    let result = if should_execute {
1512                        // Actually execute the tool
1513                        match self
1514                            .execute_tool_internal(&tool_use.name, tool_input.clone())
1515                            .await
1516                        {
1517                            Ok(res) => res, // Success - use the result
1518                            Err(e) => {
1519                                // Tool execution failed - convert to JSON error
1520                                // This allows the conversation to continue
1521                                serde_json::json!({
1522                                    "error": e.to_string(),
1523                                    "tool": tool_use.name,
1524                                    "id": tool_use.id
1525                                })
1526                            }
1527                        }
1528                    } else {
1529                        // Tool blocked by PreToolUse hook - create error result
1530                        serde_json::json!({
1531                            "error": "Tool execution blocked by hook",
1532                            "reason": block_reason.unwrap_or_else(|| "No reason provided".to_string()),
1533                            "tool": tool_use.name,
1534                            "id": tool_use.id
1535                        })
1536                    };
1537
1538                    // ============================================================
1539                    // Execute PostToolUse hooks
1540                    // ============================================================
1541                    use crate::hooks::PostToolUseEvent;
1542                    let post_event = PostToolUseEvent::new(
1543                        tool_use.name.clone(),
1544                        tool_input,
1545                        tool_use.id.clone(),
1546                        result.clone(),
1547                        history_snapshot,
1548                    );
1549
1550                    let mut final_result = result;
1551                    if let Some(decision) =
1552                        self.options.hooks().execute_post_tool_use(post_event).await
1553                    {
1554                        // PostToolUse can modify the result
1555                        // Note: Uses modified_input field (naming is historical)
1556                        if let Some(modified) = decision.modified_input() {
1557                            final_result = modified.clone();
1558                        }
1559                    }
1560
1561                    // ============================================================
1562                    // Add tool result to history
1563                    // ============================================================
1564                    // Tool results are added as user messages (per OpenAI convention)
1565                    let tool_result = ToolResultBlock::new(&tool_use.id, final_result);
1566                    let tool_result_msg =
1567                        Message::user_with_blocks(vec![ContentBlock::ToolResult(tool_result)]);
1568                    self.history.push(tool_result_msg);
1569                }
1570            }
1571
1572            // ========================================================================
1573            // STEP 7: Continue conversation to get next response
1574            // ========================================================================
1575            // Send empty string to continue - the history contains all context
1576            self.send("").await?;
1577
1578            // Loop continues to collect and process the next response
1579            // This will either be more tool calls or the final text answer
1580        }
1581    }
1582
1583    /// Receives the next content block from the current stream.
1584    ///
1585    /// This is the primary method for consuming responses from the model. It works
1586    /// differently depending on the operating mode:
1587    ///
1588    /// ## Manual Mode (default)
1589    ///
1590    /// Streams blocks directly from the API response as they arrive. You receive:
1591    /// - `TextBlock`: Incremental text from the model
1592    /// - `ToolUseBlock`: Requests to execute tools
1593    /// - Other block types as they're emitted
1594    ///
1595    /// When you receive a `ToolUseBlock`, you must:
1596    /// 1. Execute the tool yourself
1597    /// 2. Call `add_tool_result()` with the result
1598    /// 3. Call `send("")` to continue the conversation
1599    ///
1600    /// ## Automatic Mode (`auto_execute_tools = true`)
1601    ///
1602    /// Transparently executes tools and only returns final text blocks. The first
1603    /// call to `receive()` triggers the auto-execution loop which:
1604    /// 1. Collects all blocks from the stream
1605    /// 2. Executes any tool calls automatically
1606    /// 3. Continues the conversation until reaching a text-only response
1607    /// 4. Buffers the final text blocks
1608    /// 5. Returns them one at a time on subsequent `receive()` calls
1609    ///
1610    /// # Returns
1611    ///
1612    /// - `Ok(Some(block))`: Successfully received a content block
1613    /// - `Ok(None)`: Stream ended normally or was interrupted
1614    /// - `Err(e)`: An error occurred during streaming or tool execution
1615    ///
1616    /// # Behavior Details
1617    ///
1618    /// ## Interruption
1619    ///
1620    /// Checks the interrupt flag on every call. If interrupted, immediately returns
1621    /// `Ok(None)` and clears the stream. The client can be reused after interruption.
1622    ///
1623    /// ## Stream Lifecycle
1624    ///
1625    /// 1. After `send()`, stream is active
1626    /// 2. Each `receive()` call yields one block
1627    /// 3. When stream ends, returns `Ok(None)`
1628    /// 4. Subsequent calls continue returning `Ok(None)` until next `send()`
1629    ///
1630    /// ## Auto-Execution Buffer
1631    ///
1632    /// In auto mode, blocks are buffered in memory. The buffer persists until
1633    /// fully consumed (index reaches length), at which point it's cleared.
1634    ///
1635    /// # State Changes
1636    ///
1637    /// - Advances stream position
1638    /// - In auto mode: May trigger entire execution loop and modify history
1639    /// - In manual mode: Only reads from stream, no history changes
1640    /// - Increments `auto_exec_index` when returning buffered blocks
1641    ///
1642    /// # Examples
1643    ///
1644    /// ## Manual Mode - Basic
1645    ///
1646    /// ```rust,no_run
1647    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1648    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1649    /// # let mut client = Client::new(AgentOptions::default())?;
1650    /// client.send("Hello!").await?;
1651    ///
1652    /// while let Some(block) = client.receive().await? {
1653    ///     match block {
1654    ///         ContentBlock::Text(text) => print!("{}", text.text),
1655    ///         _ => {}
1656    ///     }
1657    /// }
1658    /// # Ok(())
1659    /// # }
1660    /// ```
1661    ///
1662    /// ## Manual Mode - With Tools
1663    ///
1664    /// ```rust,no_run
1665    /// # use open_agent::{Client, AgentOptions, ContentBlock};
1666    /// # use serde_json::json;
1667    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1668    /// # let mut client = Client::new(AgentOptions::default())?;
1669    /// client.send("Use the calculator").await?;
1670    ///
1671    /// while let Some(block) = client.receive().await? {
1672    ///     match block {
1673    ///         ContentBlock::Text(text) => {
1674    ///             println!("{}", text.text);
1675    ///         }
1676    ///         ContentBlock::ToolUse(tool_use) => {
1677    ///             println!("Executing: {}", tool_use.name);
1678    ///
1679    ///             // Execute tool manually
1680    ///             let result = json!({"result": 42});
1681    ///
1682    ///             // Add result and continue
1683    ///             client.add_tool_result(&tool_use.id, result)?;
1684    ///             client.send("").await?;
1685    ///         }
1686    ///         _ => {}
1687    ///     }
1688    /// }
1689    /// # Ok(())
1690    /// # }
1691    /// ```
1692    ///
1693    /// ## Auto Mode
1694    ///
1695    /// ```rust,no_run
1696    /// # use open_agent::{Client, AgentOptions, ContentBlock, Tool};
1697    /// # use serde_json::json;
1698    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1699    /// let mut client = Client::new(AgentOptions::builder()
1700    ///     .auto_execute_tools(true)
1701    ///     .build()?)?;
1702    ///
1703    /// client.send("Calculate 2+2").await?;
1704    ///
1705    /// // Tools execute automatically - you only get final text
1706    /// while let Some(block) = client.receive().await? {
1707    ///     if let ContentBlock::Text(text) = block {
1708    ///         println!("{}", text.text);
1709    ///     }
1710    /// }
1711    /// # Ok(())
1712    /// # }
1713    /// ```
1714    ///
1715    /// ## With Error Handling
1716    ///
1717    /// ```rust,no_run
1718    /// # use open_agent::{Client, AgentOptions};
1719    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1720    /// # let mut client = Client::new(AgentOptions::default())?;
1721    /// client.send("Hello").await?;
1722    ///
1723    /// loop {
1724    ///     match client.receive().await {
1725    ///         Ok(Some(block)) => {
1726    ///             // Process block
1727    ///         }
1728    ///         Ok(None) => {
1729    ///             // Stream ended
1730    ///             break;
1731    ///         }
1732    ///         Err(e) => {
1733    ///             eprintln!("Error: {}", e);
1734    ///             break;
1735    ///         }
1736    ///     }
1737    /// }
1738    /// # Ok(())
1739    /// # }
1740    /// ```
1741    pub async fn receive(&mut self) -> Result<Option<ContentBlock>> {
1742        // ========================================================================
1743        // AUTO-EXECUTION MODE
1744        // ========================================================================
1745        if self.options.auto_execute_tools() {
1746            // Check if we have buffered blocks to return
1747            // In auto mode, all final text blocks are buffered and returned one at a time
1748            if self.auto_exec_index < self.auto_exec_buffer.len() {
1749                // Return next buffered block
1750                let block = self.auto_exec_buffer[self.auto_exec_index].clone();
1751                self.auto_exec_index += 1;
1752                return Ok(Some(block));
1753            }
1754
1755            // No buffered blocks - need to run auto-execution loop
1756            // This only happens on the first receive() call after send()
1757            if self.auto_exec_buffer.is_empty() {
1758                match self.auto_execute_loop().await {
1759                    Ok(blocks) => {
1760                        // Buffer all final text blocks
1761                        self.auto_exec_buffer = blocks;
1762                        self.auto_exec_index = 0;
1763
1764                        // If no blocks, return None (empty response)
1765                        if self.auto_exec_buffer.is_empty() {
1766                            return Ok(None);
1767                        }
1768
1769                        // Return first buffered block
1770                        let block = self.auto_exec_buffer[0].clone();
1771                        self.auto_exec_index = 1;
1772                        return Ok(Some(block));
1773                    }
1774                    Err(e) => return Err(e),
1775                }
1776            }
1777
1778            // Buffer exhausted - return None
1779            Ok(None)
1780        } else {
1781            // ====================================================================
1782            // MANUAL MODE
1783            // ====================================================================
1784            // Stream blocks directly from API without buffering or auto-execution
1785            self.receive_one().await
1786        }
1787    }
1788
1789    /// Interrupts the current operation by setting the interrupt flag.
1790    ///
1791    /// This method provides a thread-safe way to cancel any in-progress streaming
1792    /// operation. The interrupt flag is checked by `receive()` before each block,
1793    /// allowing responsive cancellation.
1794    ///
1795    /// # Behavior
1796    ///
1797    /// - Sets the atomic interrupt flag to `true`
1798    /// - Next `receive()` call will return `Ok(None)` and clear the stream
1799    /// - Flag is automatically reset to `false` on next `send()` call
1800    /// - Safe to call from any thread (uses atomic operations)
1801    /// - Idempotent: calling multiple times has same effect as calling once
1802    /// - No-op if no operation is in progress
1803    ///
1804    /// # Thread Safety
1805    ///
1806    /// This method uses `Arc<AtomicBool>` internally, which can be safely shared
1807    /// across threads. You can clone the interrupt handle and use it from different
1808    /// threads or async tasks:
1809    ///
1810    /// ```rust,no_run
1811    /// # use open_agent::{Client, AgentOptions};
1812    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1813    /// let mut client = Client::new(AgentOptions::default())?;
1814    /// let interrupt_handle = client.interrupt_handle();
1815    ///
1816    /// // Use from another thread
1817    /// tokio::spawn(async move {
1818    ///     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1819    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1820    /// });
1821    /// # Ok(())
1822    /// # }
1823    /// ```
1824    ///
1825    /// # State Changes
1826    ///
1827    /// - Sets `interrupted` flag to `true`
1828    /// - Does NOT modify stream, history, or other state directly
1829    /// - Effect takes place on next `receive()` call
1830    ///
1831    /// # Use Cases
1832    ///
1833    /// - User cancellation (e.g., stop button in UI)
1834    /// - Timeout enforcement
1835    /// - Resource cleanup
1836    /// - Emergency shutdown
1837    ///
1838    /// # Examples
1839    ///
1840    /// ## Basic Interruption
1841    ///
1842    /// ```rust,no_run
1843    /// use open_agent::{Client, AgentOptions};
1844    ///
1845    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1846    /// let mut client = Client::new(AgentOptions::default())?;
1847    ///
1848    /// client.send("Tell me a long story").await?;
1849    ///
1850    /// // Interrupt after receiving some blocks
1851    /// let mut count = 0;
1852    /// while let Some(block) = client.receive().await? {
1853    ///     count += 1;
1854    ///     if count >= 5 {
1855    ///         client.interrupt();
1856    ///     }
1857    /// }
1858    ///
1859    /// // Client is ready for new queries
1860    /// client.send("What's 2+2?").await?;
1861    /// # Ok(())
1862    /// # }
1863    /// ```
1864    ///
1865    /// ## With Timeout
1866    ///
1867    /// ```rust,no_run
1868    /// use open_agent::{Client, AgentOptions};
1869    /// use std::time::Duration;
1870    ///
1871    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1872    /// let mut client = Client::new(AgentOptions::default())?;
1873    ///
1874    /// client.send("Long request").await?;
1875    ///
1876    /// // Spawn timeout task
1877    /// let interrupt_handle = client.interrupt_handle();
1878    /// tokio::spawn(async move {
1879    ///     tokio::time::sleep(Duration::from_secs(10)).await;
1880    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1881    /// });
1882    ///
1883    /// while let Some(_block) = client.receive().await? {
1884    ///     // Process until timeout
1885    /// }
1886    /// # Ok(())
1887    /// # }
1888    /// ```
1889    pub fn interrupt(&self) {
1890        // Set interrupt flag using SeqCst for immediate visibility across all threads
1891        self.interrupted.store(true, Ordering::SeqCst);
1892    }
1893
1894    /// Returns a clone of the interrupt handle for thread-safe cancellation.
1895    ///
1896    /// This method provides access to the shared `Arc<AtomicBool>` interrupt flag,
1897    /// allowing it to be used from other threads or async tasks to signal cancellation.
1898    ///
1899    /// # Returns
1900    ///
1901    /// A cloned `Arc<AtomicBool>` that can be used to interrupt operations from any thread.
1902    ///
1903    /// # Examples
1904    ///
1905    /// ```rust,no_run
1906    /// # use open_agent::{Client, AgentOptions};
1907    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1908    /// let mut client = Client::new(AgentOptions::default())?;
1909    /// let interrupt_handle = client.interrupt_handle();
1910    ///
1911    /// // Use from another thread
1912    /// tokio::spawn(async move {
1913    ///     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1914    ///     interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1915    /// });
1916    /// # Ok(())
1917    /// # }
1918    /// ```
1919    pub fn interrupt_handle(&self) -> Arc<AtomicBool> {
1920        self.interrupted.clone()
1921    }
1922
1923    /// Returns a reference to the conversation history.
1924    ///
1925    /// The history contains all messages exchanged in the conversation, including:
1926    /// - User messages
1927    /// - Assistant messages (with text and tool use blocks)
1928    /// - Tool result messages
1929    ///
1930    /// # Returns
1931    ///
1932    /// A slice of `Message` objects in chronological order.
1933    ///
1934    /// # Use Cases
1935    ///
1936    /// - Inspecting conversation context
1937    /// - Debugging tool execution flow
1938    /// - Saving conversation state
1939    /// - Implementing custom history management
1940    ///
1941    /// # Examples
1942    ///
1943    /// ```rust
1944    /// # use open_agent::{Client, AgentOptions};
1945    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1946    /// let client = Client::new(AgentOptions::default())?;
1947    ///
1948    /// // Initially empty
1949    /// assert_eq!(client.history().len(), 0);
1950    /// # Ok(())
1951    /// # }
1952    /// ```
1953    pub fn history(&self) -> &[Message] {
1954        &self.history
1955    }
1956
1957    /// Returns a mutable reference to the conversation history.
1958    ///
1959    /// This allows you to modify the history directly for advanced use cases like:
1960    /// - Removing old messages to manage context length
1961    /// - Editing messages for retry scenarios
1962    /// - Injecting synthetic messages for testing
1963    ///
1964    /// # Warning
1965    ///
1966    /// Modifying history directly can lead to inconsistent conversation state if not
1967    /// done carefully. The SDK expects history to follow the proper message flow
1968    /// (user → assistant → tool results → assistant, etc.).
1969    ///
1970    /// # Examples
1971    ///
1972    /// ```rust
1973    /// # use open_agent::{Client, AgentOptions};
1974    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1975    /// let mut client = Client::new(AgentOptions::default())?;
1976    ///
1977    /// // Remove oldest messages to stay within context limit
1978    /// if client.history().len() > 50 {
1979    ///     client.history_mut().drain(0..10);
1980    /// }
1981    /// # Ok(())
1982    /// # }
1983    /// ```
1984    pub fn history_mut(&mut self) -> &mut Vec<Message> {
1985        &mut self.history
1986    }
1987
1988    /// Returns a reference to the agent configuration options.
1989    ///
1990    /// Provides read-only access to the `AgentOptions` used to configure this client.
1991    ///
1992    /// # Use Cases
1993    ///
1994    /// - Inspecting current configuration
1995    /// - Debugging issues
1996    /// - Conditional logic based on settings
1997    ///
1998    /// # Examples
1999    ///
2000    /// ```rust
2001    /// # use open_agent::{Client, AgentOptions};
2002    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2003    /// let client = Client::new(AgentOptions::builder()
2004    ///     .model("gpt-4")
2005    ///     .base_url("http://localhost:1234/v1")
2006    ///     .build()?)?;
2007    ///
2008    /// println!("Using model: {}", client.options().model());
2009    /// # Ok(())
2010    /// # }
2011    /// ```
2012    pub fn options(&self) -> &AgentOptions {
2013        &self.options
2014    }
2015
2016    /// Clears all conversation history.
2017    ///
2018    /// This resets the conversation to a blank slate while preserving the client
2019    /// configuration (tools, hooks, model, etc.). The next message will start a
2020    /// fresh conversation with no prior context.
2021    ///
2022    /// # State Changes
2023    ///
2024    /// - Clears `history` vector
2025    /// - Does NOT modify current stream, options, or other state
2026    ///
2027    /// # Use Cases
2028    ///
2029    /// - Starting a new conversation
2030    /// - Preventing context length issues
2031    /// - Clearing sensitive data
2032    /// - Implementing conversation sessions
2033    ///
2034    /// # Examples
2035    ///
2036    /// ```rust,no_run
2037    /// # use open_agent::{Client, AgentOptions, ContentBlock};
2038    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2039    /// let mut client = Client::new(AgentOptions::default())?;
2040    ///
2041    /// // First conversation
2042    /// client.send("Hello").await?;
2043    /// while let Some(_) = client.receive().await? {}
2044    ///
2045    /// // Clear and start fresh
2046    /// client.clear_history();
2047    ///
2048    /// // New conversation with no memory of previous
2049    /// client.send("Hello again").await?;
2050    /// # Ok(())
2051    /// # }
2052    /// ```
2053    pub fn clear_history(&mut self) {
2054        self.history.clear();
2055    }
2056
2057    /// Adds a tool result to the conversation history for manual tool execution.
2058    ///
2059    /// This method is used exclusively in **manual mode** after receiving a `ToolUseBlock`.
2060    /// The workflow is:
2061    ///
2062    /// 1. `receive()` returns a `ToolUseBlock`
2063    /// 2. You execute the tool yourself
2064    /// 3. Call `add_tool_result()` with the tool's output
2065    /// 4. Call `send("")` to continue the conversation
2066    /// 5. The model receives the tool result and generates a response
2067    ///
2068    /// # Parameters
2069    ///
2070    /// - `tool_use_id`: The unique ID from the `ToolUseBlock` (must match exactly)
2071    /// - `content`: The tool's output as a JSON value
2072    ///
2073    /// # Behavior
2074    ///
2075    /// Creates a `ToolResultBlock` and adds it to conversation history as a tool message.
2076    /// This preserves the tool call/result pairing that the model needs to understand
2077    /// the conversation flow.
2078    ///
2079    /// # State Changes
2080    ///
2081    /// - Appends a tool message to `history`
2082    /// - Does NOT modify stream or trigger any requests
2083    ///
2084    /// # Important Notes
2085    ///
2086    /// - **Not used in auto mode**: Auto-execution handles tool results automatically
2087    /// - **ID must match**: The `tool_use_id` must match the ID from the `ToolUseBlock`
2088    /// - **No validation**: This method doesn't validate the result format
2089    /// - **Must call send()**: After adding result(s), call `send("")` to continue
2090    ///
2091    /// # Examples
2092    ///
2093    /// ## Basic Manual Tool Execution
2094    ///
2095    /// ```rust,no_run
2096    /// use open_agent::{Client, AgentOptions, ContentBlock};
2097    /// use serde_json::json;
2098    ///
2099    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2100    /// let mut client = Client::new(AgentOptions::default())?;
2101    /// client.send("Use the calculator").await?;
2102    ///
2103    /// while let Some(block) = client.receive().await? {
2104    ///     match block {
2105    ///         ContentBlock::ToolUse(tool_use) => {
2106    ///             // Execute tool manually
2107    ///             let result = json!({"result": 42});
2108    ///
2109    ///             // Add result to history
2110    ///             client.add_tool_result(&tool_use.id, result)?;
2111    ///
2112    ///             // Continue conversation to get model's response
2113    ///             client.send("").await?;
2114    ///         }
2115    ///         ContentBlock::Text(text) => {
2116    ///             println!("{}", text.text);
2117    ///         }
2118    ///         _ => {}
2119    ///     }
2120    /// }
2121    /// # Ok(())
2122    /// # }
2123    /// ```
2124    ///
2125    /// ## Handling Tool Errors
2126    ///
2127    /// ```rust,no_run
2128    /// use open_agent::{Client, AgentOptions, ContentBlock};
2129    /// use serde_json::json;
2130    ///
2131    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2132    /// # let mut client = Client::new(AgentOptions::default())?;
2133    /// # client.send("test").await?;
2134    /// while let Some(block) = client.receive().await? {
2135    ///     if let ContentBlock::ToolUse(tool_use) = block {
2136    ///         // Try to execute tool
2137    ///         let result = match execute_tool(&tool_use.name, &tool_use.input) {
2138    ///             Ok(output) => output,
2139    ///             Err(e) => json!({
2140    ///                 "error": e.to_string(),
2141    ///                 "tool": tool_use.name
2142    ///             })
2143    ///         };
2144    ///
2145    ///         client.add_tool_result(&tool_use.id, result)?;
2146    ///         client.send("").await?;
2147    ///     }
2148    /// }
2149    ///
2150    /// # fn execute_tool(name: &str, input: &serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
2151    /// #     Ok(json!({}))
2152    /// # }
2153    /// # Ok(())
2154    /// # }
2155    /// ```
2156    ///
2157    /// ## Multiple Tool Calls
2158    ///
2159    /// ```rust,no_run
2160    /// use open_agent::{Client, AgentOptions, ContentBlock};
2161    /// use serde_json::json;
2162    ///
2163    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2164    /// # let mut client = Client::new(AgentOptions::default())?;
2165    /// client.send("Calculate 2+2 and 3+3").await?;
2166    ///
2167    /// let mut tool_calls = Vec::new();
2168    ///
2169    /// // Collect all tool calls
2170    /// while let Some(block) = client.receive().await? {
2171    ///     if let ContentBlock::ToolUse(tool_use) = block {
2172    ///         tool_calls.push(tool_use);
2173    ///     }
2174    /// }
2175    ///
2176    /// // Execute and add results for all tools
2177    /// for tool_call in tool_calls {
2178    ///     let result = json!({"result": 42}); // Execute tool
2179    ///     client.add_tool_result(&tool_call.id, result)?;
2180    /// }
2181    ///
2182    /// // Continue conversation
2183    /// client.send("").await?;
2184    /// # Ok(())
2185    /// # }
2186    /// ```
2187    pub fn add_tool_result(&mut self, tool_use_id: &str, content: serde_json::Value) -> Result<()> {
2188        use crate::types::ToolResultBlock;
2189
2190        // Create a tool result block with the given ID and content
2191        let result_block = ToolResultBlock::new(tool_use_id, content);
2192
2193        // Add to history as a tool message
2194        // Note: Currently using a simplified representation with TextBlock
2195        // TODO: Properly serialize ToolResultBlock for OpenAI format
2196        let serialized = serde_json::to_string(&result_block.content)
2197            .map_err(|e| Error::config(format!("Failed to serialize tool result: {}", e)))?;
2198
2199        self.history.push(Message::new(
2200            MessageRole::Tool,
2201            vec![ContentBlock::Text(TextBlock::new(serialized))],
2202        ));
2203
2204        Ok(())
2205    }
2206
2207    /// Looks up a registered tool by name.
2208    ///
2209    /// This method provides access to the tool registry for manual execution scenarios.
2210    /// It searches the tools registered in `AgentOptions` and returns a reference to
2211    /// the matching tool if found.
2212    ///
2213    /// # Parameters
2214    ///
2215    /// - `name`: The tool name to search for (case-sensitive)
2216    ///
2217    /// # Returns
2218    ///
2219    /// - `Some(&Tool)`: Tool found
2220    /// - `None`: No tool with that name
2221    ///
2222    /// # Use Cases
2223    ///
2224    /// - Manual tool execution in response to `ToolUseBlock`
2225    /// - Validating tool availability before offering features
2226    /// - Inspecting tool metadata (name, description, schema)
2227    ///
2228    /// # Examples
2229    ///
2230    /// ## Execute Tool Manually
2231    ///
2232    /// ```rust,no_run
2233    /// use open_agent::{Client, AgentOptions, ContentBlock};
2234    ///
2235    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2236    /// # let mut client = Client::new(AgentOptions::default())?;
2237    /// # client.send("test").await?;
2238    /// while let Some(block) = client.receive().await? {
2239    ///     if let ContentBlock::ToolUse(tool_use) = block {
2240    ///         if let Some(tool) = client.get_tool(&tool_use.name) {
2241    ///             // Execute the tool
2242    ///             let result = tool.execute(tool_use.input.clone()).await?;
2243    ///             client.add_tool_result(&tool_use.id, result)?;
2244    ///             client.send("").await?;
2245    ///         } else {
2246    ///             println!("Unknown tool: {}", tool_use.name);
2247    ///         }
2248    ///     }
2249    /// }
2250    /// # Ok(())
2251    /// # }
2252    /// ```
2253    ///
2254    /// ## Check Tool Availability
2255    ///
2256    /// ```rust
2257    /// # use open_agent::{Client, AgentOptions};
2258    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2259    /// # let client = Client::new(AgentOptions::default())?;
2260    /// if client.get_tool("calculator").is_some() {
2261    ///     println!("Calculator is available");
2262    /// }
2263    /// # Ok(())
2264    /// # }
2265    /// ```
2266    pub fn get_tool(&self, name: &str) -> Option<&crate::tools::Tool> {
2267        // Search registered tools by name
2268        self.options
2269            .tools()
2270            .iter()
2271            .find(|t| t.name() == name)
2272            .map(|t| t.as_ref())
2273    }
2274}
2275
2276#[cfg(test)]
2277mod tests {
2278    use super::*;
2279
2280    #[test]
2281    fn test_client_creation() {
2282        let options = AgentOptions::builder()
2283            .system_prompt("Test")
2284            .model("test-model")
2285            .base_url("http://localhost:1234/v1")
2286            .build()
2287            .unwrap();
2288
2289        let client = Client::new(options).expect("Should create client successfully");
2290        assert_eq!(client.history().len(), 0);
2291    }
2292
2293    #[test]
2294    fn test_client_new_returns_result() {
2295        // Test that Client::new() returns Result instead of panicking
2296        let options = AgentOptions::builder()
2297            .system_prompt("Test")
2298            .model("test-model")
2299            .base_url("http://localhost:1234/v1")
2300            .build()
2301            .unwrap();
2302
2303        // This should not panic - it should return Ok(client)
2304        let result = Client::new(options);
2305        assert!(result.is_ok(), "Client::new() should return Ok");
2306
2307        let client = result.unwrap();
2308        assert_eq!(client.history().len(), 0);
2309    }
2310
2311    #[test]
2312    fn test_interrupt_flag_initial_state() {
2313        let options = AgentOptions::builder()
2314            .system_prompt("Test")
2315            .model("test-model")
2316            .base_url("http://localhost:1234/v1")
2317            .build()
2318            .unwrap();
2319
2320        let client = Client::new(options).expect("Should create client successfully");
2321        // Initially not interrupted
2322        assert!(!client.interrupted.load(Ordering::SeqCst));
2323    }
2324
2325    #[test]
2326    fn test_interrupt_sets_flag() {
2327        let options = AgentOptions::builder()
2328            .system_prompt("Test")
2329            .model("test-model")
2330            .base_url("http://localhost:1234/v1")
2331            .build()
2332            .unwrap();
2333
2334        let client = Client::new(options).expect("Should create client successfully");
2335        client.interrupt();
2336        assert!(client.interrupted.load(Ordering::SeqCst));
2337    }
2338
2339    #[test]
2340    fn test_interrupt_idempotent() {
2341        let options = AgentOptions::builder()
2342            .system_prompt("Test")
2343            .model("test-model")
2344            .base_url("http://localhost:1234/v1")
2345            .build()
2346            .unwrap();
2347
2348        let client = Client::new(options).expect("Should create client successfully");
2349        client.interrupt();
2350        assert!(client.interrupted.load(Ordering::SeqCst));
2351
2352        // Call again - should still be interrupted
2353        client.interrupt();
2354        assert!(client.interrupted.load(Ordering::SeqCst));
2355    }
2356
2357    #[tokio::test]
2358    async fn test_receive_returns_none_when_interrupted() {
2359        let options = AgentOptions::builder()
2360            .system_prompt("Test")
2361            .model("test-model")
2362            .base_url("http://localhost:1234/v1")
2363            .build()
2364            .unwrap();
2365
2366        let mut client = Client::new(options).expect("Should create client successfully");
2367
2368        // Interrupt before receiving
2369        client.interrupt();
2370
2371        // NEW SIGNATURE: receive() should return Ok(None) when interrupted
2372        let result = client.receive().await;
2373        assert!(result.is_ok());
2374        assert!(result.unwrap().is_none());
2375    }
2376
2377    #[tokio::test]
2378    async fn test_receive_returns_ok_none_when_no_stream() {
2379        let options = AgentOptions::builder()
2380            .system_prompt("Test")
2381            .model("test-model")
2382            .base_url("http://localhost:1234/v1")
2383            .build()
2384            .unwrap();
2385
2386        let mut client = Client::new(options).expect("Should create client successfully");
2387
2388        // No stream started - receive() should return Ok(None)
2389        let result = client.receive().await;
2390        assert!(result.is_ok());
2391        assert!(result.unwrap().is_none());
2392    }
2393
2394    #[tokio::test]
2395    async fn test_receive_error_propagation() {
2396        // This test demonstrates that errors are wrapped in Err(), not Some(Err())
2397        // We'll verify this behavior when we have a mock stream that produces errors
2398        let options = AgentOptions::builder()
2399            .system_prompt("Test")
2400            .model("test-model")
2401            .base_url("http://localhost:1234/v1")
2402            .build()
2403            .unwrap();
2404
2405        let client = Client::new(options).expect("Should create client successfully");
2406
2407        // Signature check: receive() returns Result<Option<ContentBlock>>
2408        // This means we can use ? operator cleanly:
2409        // while let Some(block) = client.receive().await? { ... }
2410
2411        // Type assertion to ensure signature is correct
2412        let _: Result<Option<ContentBlock>> = std::future::ready(Ok(None)).await;
2413        drop(client);
2414    }
2415}