open_agent/client.rs
1//! Client for streaming queries and multi-turn conversations
2//!
3//! This module provides the core streaming client implementation for the Open Agent SDK.
4//! It handles communication with OpenAI-compatible APIs, manages conversation history,
5//! and provides two modes of operation: manual and automatic tool execution.
6//!
7//! # Architecture Overview
8//!
9//! The SDK implements a **streaming-first architecture** where all responses from the model
10//! are received as a stream of content blocks. This design enables:
11//!
12//! - **Progressive rendering**: Display text as it arrives without waiting for completion
13//! - **Real-time tool execution**: Execute tools as they're requested by the model
14//! - **Interruption support**: Cancel operations mid-stream without corrupting state
15//! - **Memory efficiency**: Process large responses without buffering everything in memory
16//!
17//! ## Two Operating Modes
18//!
19//! ### 1. Manual Tool Execution Mode (default)
20//!
21//! In manual mode, the client streams content blocks directly to the caller. When the model
22//! requests a tool, the caller receives a `ToolUseBlock`, executes the tool, adds the result
23//! using `add_tool_result()`, and continues the conversation with another `send()` call.
24//!
25//! **Use cases**: Custom tool execution logic, interactive debugging, fine-grained control
26//!
27//! ### 2. Automatic Tool Execution Mode
28//!
29//! When `auto_execute_tools` is enabled, the client automatically executes tools and continues
30//! the conversation until receiving a text-only response. The caller only receives the final
31//! text blocks after all tool iterations complete.
32//!
33//! **Use cases**: Simple agentic workflows, automated task completion, batch processing
34//!
35//! ## Request Flow
36//!
37//! ```text
38//! User sends prompt
39//! │
40//! ├─> UserPromptSubmit hook executes (can modify/block prompt)
41//! │
42//! ├─> Prompt added to history
43//! │
44//! ├─> HTTP request to OpenAI-compatible API
45//! │
46//! ├─> Response streamed as Server-Sent Events (SSE)
47//! │
48//! ├─> SSE chunks aggregated into ContentBlocks
49//! │
50//! └─> Blocks emitted to caller (or buffered for auto-execution)
51//! ```
52//!
53//! ## Tool Execution Flow
54//!
55//! ### Manual Mode:
56//! ```text
57//! receive() → ToolUseBlock
58//! │
59//! ├─> Caller executes tool
60//! │
61//! ├─> Caller calls add_tool_result()
62//! │
63//! ├─> Caller calls send("") to continue
64//! │
65//! └─> receive() → TextBlock (model's response)
66//! ```
67//!
68//! ### Auto Mode:
69//! ```text
70//! receive() triggers auto-execution loop
71//! │
72//! ├─> Collect all blocks from stream
73//! │
74//! ├─> For each ToolUseBlock:
75//! │ ├─> PreToolUse hook executes (can modify/block)
76//! │ ├─> Tool executed via Tool.execute()
77//! │ ├─> PostToolUse hook executes (can modify result)
78//! │ └─> Result added to history
79//! │
80//! ├─> Continue conversation with send("")
81//! │
82//! ├─> Repeat until text-only response or max iterations
83//! │
84//! └─> Return text blocks one-by-one via receive()
85//! ```
86//!
87//! ## State Management
88//!
89//! The client maintains several pieces of state:
90//!
91//! - **history**: Full conversation history (`Vec<Message>`)
92//! - **current_stream**: Active SSE stream being consumed (`Option<ContentStream>`)
93//! - **interrupted**: Atomic flag for cancellation (`Arc<AtomicBool>`)
94//! - **auto_exec_buffer**: Buffered blocks for auto-execution mode (`Vec<ContentBlock>`)
95//! - **auto_exec_index**: Current position in buffer (usize)
96//!
97//! ## Interruption Mechanism
98//!
99//! The interrupt system uses `Arc<AtomicBool>` to enable safe, thread-safe cancellation:
100//!
101//! ```rust,no_run
102//! # use open_agent::{Client, AgentOptions};
103//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
104//! let mut client = Client::new(AgentOptions::default())?;
105//! let handle = client.interrupt_handle(); // Clone Arc for use in other threads
106//!
107//! // In another thread or async task:
108//! tokio::spawn(async move {
109//! tokio::time::sleep(std::time::Duration::from_secs(5)).await;
110//! handle.store(true, std::sync::atomic::Ordering::SeqCst);
111//! });
112//!
113//! client.send("Long request").await?;
114//! while let Some(block) = client.receive().await? {
115//! // Will stop when interrupted
116//! }
117//! # Ok(())
118//! # }
119//! ```
120//!
121//! ## Hook Integration
122//!
123//! Hooks provide extension points throughout the request lifecycle:
124//!
125//! - **UserPromptSubmit**: Called before sending user prompt (can modify or block)
126//! - **PreToolUse**: Called before executing each tool (can modify input or block execution)
127//! - **PostToolUse**: Called after tool execution (can modify result)
128//!
129//! Hooks are only invoked in specific scenarios and have access to conversation history.
130//!
131//! ## Error Handling
132//!
133//! Errors are propagated immediately and leave the client in a valid state:
134//!
135//! - **HTTP errors**: Network failures, timeouts, connection issues
136//! - **API errors**: Invalid model, authentication failures, rate limits
137//! - **Parse errors**: Malformed SSE responses, invalid JSON
138//! - **Tool errors**: Tool execution failures (converted to JSON error responses)
139//! - **Hook errors**: Hook execution failures or blocked operations
140//!
141//! After an error, the client remains usable for new requests.
142//!
143//! # Examples
144//!
145//! ## Simple Single-Turn Query
146//!
147//! ```rust,no_run
148//! use open_agent::{query, AgentOptions};
149//! use futures::StreamExt;
150//!
151//! #[tokio::main]
152//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
153//! let options = AgentOptions::builder()
154//! .model("gpt-4")
155//! .api_key("sk-...")
156//! .build()?;
157//!
158//! let mut stream = query("What is Rust?", &options).await?;
159//!
160//! while let Some(block) = stream.next().await {
161//! if let open_agent::ContentBlock::Text(text) = block? {
162//! print!("{}", text.text);
163//! }
164//! }
165//!
166//! Ok(())
167//! }
168//! ```
169//!
170//! ## Multi-Turn Conversation
171//!
172//! ```rust,no_run
173//! use open_agent::{Client, AgentOptions, ContentBlock};
174//! use futures::StreamExt;
175//!
176//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
177//! let mut client = Client::new(AgentOptions::builder()
178//! .model("gpt-4")
179//! .api_key("sk-...")
180//! .build()?)?;
181//!
182//! // First question
183//! client.send("What's the capital of France?").await?;
184//! while let Some(block) = client.receive().await? {
185//! if let ContentBlock::Text(text) = block {
186//! println!("{}", text.text);
187//! }
188//! }
189//!
190//! // Follow-up question (history is maintained)
191//! client.send("What's its population?").await?;
192//! while let Some(block) = client.receive().await? {
193//! if let ContentBlock::Text(text) = block {
194//! println!("{}", text.text);
195//! }
196//! }
197//! # Ok(())
198//! # }
199//! ```
200//!
201//! ## Manual Tool Execution
202//!
203//! ```rust,no_run
204//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
205//! use serde_json::json;
206//!
207//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208//! let calculator = Tool::new(
209//! "calculator",
210//! "Performs arithmetic operations",
211//! json!({"type": "object", "properties": {"operation": {"type": "string"}}}),
212//! |input| Box::pin(async move {
213//! // Custom execution logic
214//! Ok(json!({"result": 42}))
215//! })
216//! );
217//!
218//! let mut client = Client::new(AgentOptions::builder()
219//! .model("gpt-4")
220//! .api_key("sk-...")
221//! .tools(vec![calculator])
222//! .build()?)?;
223//!
224//! client.send("Calculate 2+2").await?;
225//!
226//! while let Some(block) = client.receive().await? {
227//! match block {
228//! ContentBlock::ToolUse(tool_use) => {
229//! println!("Model wants to use: {}", tool_use.name);
230//!
231//! // Execute tool manually
232//! let tool = client.get_tool(&tool_use.name).unwrap();
233//! let result = tool.execute(tool_use.input).await?;
234//!
235//! // Add result and continue
236//! client.add_tool_result(&tool_use.id, result)?;
237//! client.send("").await?;
238//! }
239//! ContentBlock::Text(text) => {
240//! println!("Response: {}", text.text);
241//! }
242//! _ => {}
243//! }
244//! }
245//! # Ok(())
246//! # }
247//! ```
248//!
249//! ## Automatic Tool Execution
250//!
251//! ```rust,no_run
252//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
253//! use serde_json::json;
254//!
255//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256//! let calculator = Tool::new(
257//! "calculator",
258//! "Performs arithmetic operations",
259//! json!({"type": "object"}),
260//! |input| Box::pin(async move { Ok(json!({"result": 42})) })
261//! );
262//!
263//! let mut client = Client::new(AgentOptions::builder()
264//! .model("gpt-4")
265//! .api_key("sk-...")
266//! .tools(vec![calculator])
267//! .auto_execute_tools(true) // Enable auto-execution
268//! .max_tool_iterations(5) // Max 5 tool rounds
269//! .build()?)?;
270//!
271//! client.send("Calculate 2+2 and then multiply by 3").await?;
272//!
273//! // Tools are executed automatically - you only get final text response
274//! while let Some(block) = client.receive().await? {
275//! if let ContentBlock::Text(text) = block {
276//! println!("{}", text.text);
277//! }
278//! }
279//! # Ok(())
280//! # }
281//! ```
282//!
283//! ## With Hooks
284//!
285//! ```ignore
286//! use open_agent::{Client, AgentOptions, Hooks, HookDecision};
287//!
288//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
289//! let hooks = Hooks::new()
290//! .add_user_prompt_submit(|event| async move {
291//! // Block prompts containing certain words
292//! if event.prompt.contains("forbidden") {
293//! return Some(HookDecision::block("Forbidden word detected"));
294//! }
295//! Some(HookDecision::continue_())
296//! })
297//! .add_pre_tool_use(|event| async move {
298//! // Log all tool uses
299//! println!("Executing tool: {}", event.tool_name);
300//! Some(HookDecision::continue_())
301//! });
302//!
303//! let mut client = Client::new(AgentOptions::builder()
304//! .model("gpt-4")
305//! .base_url("http://localhost:1234/v1")
306//! .hooks(hooks)
307//! .build()?)?;
308//!
309//! // Hooks will be executed automatically
310//! client.send("Hello!").await?;
311//! # Ok(())
312//! # }
313//! ```
314
315use crate::types::{
316 AgentOptions, ContentBlock, Message, MessageRole, OpenAIMessage, OpenAIRequest, TextBlock,
317};
318use crate::utils::{ToolCallAggregator, parse_sse_stream};
319use crate::{Error, Result};
320use futures::stream::{Stream, StreamExt};
321use std::pin::Pin;
322use std::sync::Arc;
323use std::sync::atomic::{AtomicBool, Ordering};
324use std::time::Duration;
325
326/// A pinned, boxed stream of content blocks from the model.
327///
328/// This type alias represents an asynchronous stream that yields `ContentBlock` items.
329/// Each item is wrapped in a `Result` to handle potential errors during streaming.
330///
331/// The stream is:
332/// - **Pinned** (`Pin<Box<...>>`): Required for safe async operations and self-referential types
333/// - **Boxed**: Allows dynamic dispatch and hides the concrete stream implementation
334/// - **Send**: Can be safely transferred between threads
335///
336/// # Content Blocks
337///
338/// The stream can yield several types of content blocks:
339///
340/// - **TextBlock**: Incremental text responses from the model
341/// - **ToolUseBlock**: Requests to execute a tool with specific parameters
342/// - **ToolResultBlock**: Results from tool execution (in manual mode)
343///
344/// # Error Handling
345///
346/// Errors in the stream indicate issues like:
347/// - Network failures or timeouts
348/// - Malformed SSE events
349/// - JSON parsing errors
350/// - API errors from the model provider
351///
352/// When an error occurs, the stream typically terminates. It's the caller's responsibility
353/// to handle errors appropriately.
354///
355/// # Examples
356///
357/// ```rust,no_run
358/// use open_agent::{query, AgentOptions, ContentBlock};
359/// use futures::StreamExt;
360///
361/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
362/// let options = AgentOptions::builder()
363/// .model("gpt-4")
364/// .api_key("sk-...")
365/// .build()?;
366///
367/// let mut stream = query("Hello!", &options).await?;
368///
369/// while let Some(result) = stream.next().await {
370/// match result {
371/// Ok(ContentBlock::Text(text)) => print!("{}", text.text),
372/// Ok(_) => {}, // Other block types
373/// Err(e) => eprintln!("Stream error: {}", e),
374/// }
375/// }
376/// # Ok(())
377/// # }
378/// ```
379pub type ContentStream = Pin<Box<dyn Stream<Item = Result<ContentBlock>> + Send>>;
380
381/// Simple query function for single-turn interactions without conversation history.
382///
383/// This is a stateless convenience function for simple queries that don't require
384/// multi-turn conversations. It creates a temporary HTTP client, sends a single
385/// prompt, and returns a stream of content blocks.
386///
387/// For multi-turn conversations or more control over the interaction, use [`Client`] instead.
388///
389/// # Parameters
390///
391/// - `prompt`: The user's message to send to the model
392/// - `options`: Configuration including model, API key, tools, etc.
393///
394/// # Returns
395///
396/// Returns a `ContentStream` that yields content blocks as they arrive from the model.
397/// The stream must be polled to completion to receive all blocks.
398///
399/// # Behavior
400///
401/// 1. Creates a temporary HTTP client with configured timeout
402/// 2. Builds message array (system prompt + user prompt)
403/// 3. Converts tools to OpenAI format if provided
404/// 4. Makes HTTP POST request to `/chat/completions`
405/// 5. Parses Server-Sent Events (SSE) response stream
406/// 6. Aggregates chunks into complete content blocks
407/// 7. Returns stream that yields blocks as they complete
408///
409/// # Error Handling
410///
411/// This function can return errors for:
412/// - HTTP client creation failures
413/// - Network errors during the request
414/// - API errors (authentication, invalid model, rate limits, etc.)
415/// - SSE parsing errors
416/// - JSON deserialization errors
417///
418/// # Performance Notes
419///
420/// - Creates a new HTTP client for each call (consider using `Client` for repeated queries)
421/// - Timeout is configurable via `AgentOptions::timeout` (default: 120 seconds)
422/// - Streaming begins immediately; no buffering of the full response
423///
424/// # Examples
425///
426/// ## Basic Usage
427///
428/// ```rust,no_run
429/// use open_agent::{query, AgentOptions};
430/// use futures::StreamExt;
431///
432/// #[tokio::main]
433/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
434/// let options = AgentOptions::builder()
435/// .system_prompt("You are a helpful assistant")
436/// .model("gpt-4")
437/// .api_key("sk-...")
438/// .build()?;
439///
440/// let mut stream = query("What's the capital of France?", &options).await?;
441///
442/// while let Some(block) = stream.next().await {
443/// match block? {
444/// open_agent::ContentBlock::Text(text) => {
445/// print!("{}", text.text);
446/// }
447/// _ => {}
448/// }
449/// }
450///
451/// Ok(())
452/// }
453/// ```
454///
455/// ## With Tools
456///
457/// ```rust,no_run
458/// use open_agent::{query, AgentOptions, Tool, ContentBlock};
459/// use futures::StreamExt;
460/// use serde_json::json;
461///
462/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
463/// let calculator = Tool::new(
464/// "calculator",
465/// "Performs calculations",
466/// json!({"type": "object"}),
467/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
468/// );
469///
470/// let options = AgentOptions::builder()
471/// .model("gpt-4")
472/// .api_key("sk-...")
473/// .tools(vec![calculator])
474/// .build()?;
475///
476/// let mut stream = query("Calculate 2+2", &options).await?;
477///
478/// while let Some(block) = stream.next().await {
479/// match block? {
480/// ContentBlock::ToolUse(tool_use) => {
481/// println!("Model wants to use: {}", tool_use.name);
482/// // Note: You'll need to manually execute tools and continue
483/// // the conversation. For automatic execution, use Client.
484/// }
485/// ContentBlock::Text(text) => print!("{}", text.text),
486/// _ => {}
487/// }
488/// }
489/// # Ok(())
490/// # }
491/// ```
492///
493/// ## Error Handling
494///
495/// ```rust,no_run
496/// use open_agent::{query, AgentOptions};
497/// use futures::StreamExt;
498///
499/// # async fn example() {
500/// let options = AgentOptions::builder()
501/// .model("gpt-4")
502/// .api_key("invalid-key")
503/// .build()
504/// .unwrap();
505///
506/// match query("Hello", &options).await {
507/// Ok(mut stream) => {
508/// while let Some(result) = stream.next().await {
509/// match result {
510/// Ok(block) => println!("Block: {:?}", block),
511/// Err(e) => {
512/// eprintln!("Stream error: {}", e);
513/// break;
514/// }
515/// }
516/// }
517/// }
518/// Err(e) => eprintln!("Query failed: {}", e),
519/// }
520/// # }
521/// ```
522pub async fn query(prompt: &str, options: &AgentOptions) -> Result<ContentStream> {
523 // Create HTTP client with configured timeout
524 // The timeout applies to the entire request, not individual chunks
525 let client = reqwest::Client::builder()
526 .timeout(Duration::from_secs(options.timeout()))
527 .build()
528 .map_err(Error::Http)?;
529
530 // Build messages array for the API request
531 // OpenAI format expects an array of message objects with role and content
532 let mut messages = Vec::new();
533
534 // Add system prompt if provided
535 // System prompts set the assistant's behavior and context
536 if !options.system_prompt().is_empty() {
537 messages.push(OpenAIMessage {
538 role: "system".to_string(),
539 content: options.system_prompt().to_string(),
540 tool_calls: None,
541 tool_call_id: None,
542 });
543 }
544
545 // Add user prompt
546 // This is the actual query from the user
547 messages.push(OpenAIMessage {
548 role: "user".to_string(),
549 content: prompt.to_string(),
550 tool_calls: None,
551 tool_call_id: None,
552 });
553
554 // Convert tools to OpenAI format if any are provided
555 // Tools are described using JSON Schema for parameter validation
556 let tools = if !options.tools().is_empty() {
557 Some(
558 options
559 .tools()
560 .iter()
561 .map(|t| t.to_openai_format())
562 .collect(),
563 )
564 } else {
565 None
566 };
567
568 // Build the OpenAI-compatible request payload
569 // stream=true enables Server-Sent Events for incremental responses
570 let request = OpenAIRequest {
571 model: options.model().to_string(),
572 messages,
573 stream: true, // Critical: enables SSE streaming
574 max_tokens: options.max_tokens(),
575 temperature: Some(options.temperature()),
576 tools,
577 };
578
579 // Make HTTP POST request to the chat completions endpoint
580 let url = format!("{}/chat/completions", options.base_url());
581 let response = client
582 .post(&url)
583 .header("Authorization", format!("Bearer {}", options.api_key()))
584 .header("Content-Type", "application/json")
585 .json(&request)
586 .send()
587 .await
588 .map_err(Error::Http)?;
589
590 // Check for HTTP-level errors before processing the stream
591 // This catches authentication failures, rate limits, invalid models, etc.
592 if !response.status().is_success() {
593 let status = response.status();
594 let body = response.text().await.unwrap_or_else(|e| {
595 eprintln!("WARNING: Failed to read error response body: {}", e);
596 "Unknown error (failed to read response body)".to_string()
597 });
598 return Err(Error::api(format!("API error {}: {}", status, body)));
599 }
600
601 // Parse the Server-Sent Events (SSE) stream
602 // The response body is a stream of "data: {...}" events
603 let sse_stream = parse_sse_stream(response);
604
605 // Aggregate SSE chunks into complete content blocks
606 // ToolCallAggregator handles partial JSON and assembles complete tool calls
607 // The scan() combinator maintains state across stream items
608 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
609 let result = match chunk_result {
610 Ok(chunk) => match aggregator.process_chunk(chunk) {
611 Ok(blocks) => {
612 if blocks.is_empty() {
613 Some(None) // Intermediate chunk, continue streaming
614 } else {
615 Some(Some(Ok(blocks))) // Complete block(s) ready
616 }
617 }
618 Err(e) => Some(Some(Err(e))), // Propagate processing error
619 },
620 Err(e) => Some(Some(Err(e))), // Propagate stream error
621 };
622 futures::future::ready(result)
623 });
624
625 // Flatten the stream to emit individual blocks
626 // filter_map removes None values (incomplete chunks)
627 // flat_map expands Vec<ContentBlock> into individual items
628 let flattened = stream
629 .filter_map(|item| async move { item })
630 .flat_map(|result| {
631 futures::stream::iter(match result {
632 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
633 Err(e) => vec![Err(e)],
634 })
635 });
636
637 // Pin and box the stream for type erasure and safe async usage
638 Ok(Box::pin(flattened))
639}
640
641/// Stateful client for multi-turn conversations with automatic history management.
642///
643/// The `Client` is the primary interface for building conversational AI applications.
644/// It maintains conversation history, manages streaming responses, and provides two
645/// modes of operation: manual and automatic tool execution.
646///
647/// # State Management
648///
649/// The client maintains several pieces of state that persist across multiple turns:
650///
651/// - **Conversation History**: Complete record of all messages exchanged
652/// - **Active Stream**: Currently active SSE stream being consumed
653/// - **Interrupt Flag**: Thread-safe cancellation signal
654/// - **Auto-Execution Buffer**: Cached blocks for auto-execution mode
655///
656/// # Operating Modes
657///
658/// ## Manual Mode (default)
659///
660/// In manual mode, the client streams blocks directly to the caller. When the model
661/// requests a tool, you receive a `ToolUseBlock`, execute the tool yourself, add the
662/// result with `add_tool_result()`, and continue the conversation.
663///
664/// **Advantages**:
665/// - Full control over tool execution
666/// - Custom error handling per tool
667/// - Ability to modify tool inputs/outputs
668/// - Interactive debugging capabilities
669///
670/// ## Automatic Mode (`auto_execute_tools = true`)
671///
672/// In automatic mode, the client executes tools transparently and only returns the
673/// final text response after all tool iterations complete.
674///
675/// **Advantages**:
676/// - Simpler API for common use cases
677/// - Built-in retry logic via hooks
678/// - Automatic conversation continuation
679/// - Configurable iteration limits
680///
681/// # Thread Safety
682///
683/// The client is NOT thread-safe for concurrent use. However, the interrupt mechanism
684/// uses `Arc<AtomicBool>` which can be safely shared across threads to signal cancellation.
685///
686/// # Memory Management
687///
688/// - History grows unbounded by default (consider clearing periodically)
689/// - Streams are consumed lazily (low memory footprint during streaming)
690/// - Auto-execution buffers entire response (higher memory in auto mode)
691///
692/// # Examples
693///
694/// ## Basic Multi-Turn Conversation
695///
696/// ```rust,no_run
697/// use open_agent::{Client, AgentOptions, ContentBlock};
698///
699/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
700/// let mut client = Client::new(AgentOptions::builder()
701/// .model("gpt-4")
702/// .api_key("sk-...")
703/// .build()?)?;
704///
705/// // First question
706/// client.send("What's the capital of France?").await?;
707/// while let Some(block) = client.receive().await? {
708/// if let ContentBlock::Text(text) = block {
709/// println!("{}", text.text); // "Paris is the capital of France."
710/// }
711/// }
712///
713/// // Follow-up question - history is automatically maintained
714/// client.send("What's its population?").await?;
715/// while let Some(block) = client.receive().await? {
716/// if let ContentBlock::Text(text) = block {
717/// println!("{}", text.text); // "Paris has approximately 2.2 million people."
718/// }
719/// }
720/// # Ok(())
721/// # }
722/// ```
723///
724/// ## Manual Tool Execution
725///
726/// ```rust,no_run
727/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
728/// use serde_json::json;
729///
730/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
731/// let calculator = Tool::new(
732/// "calculator",
733/// "Performs arithmetic",
734/// json!({"type": "object"}),
735/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
736/// );
737///
738/// let mut client = Client::new(AgentOptions::builder()
739/// .model("gpt-4")
740/// .api_key("sk-...")
741/// .tools(vec![calculator])
742/// .build()?)?;
743///
744/// client.send("What's 2+2?").await?;
745///
746/// while let Some(block) = client.receive().await? {
747/// match block {
748/// ContentBlock::ToolUse(tool_use) => {
749/// // Execute tool manually
750/// let result = json!({"result": 4});
751/// client.add_tool_result(&tool_use.id, result)?;
752///
753/// // Continue conversation to get model's response
754/// client.send("").await?;
755/// }
756/// ContentBlock::Text(text) => {
757/// println!("{}", text.text); // "The result is 4."
758/// }
759/// _ => {}
760/// }
761/// }
762/// # Ok(())
763/// # }
764/// ```
765///
766/// ## Automatic Tool Execution
767///
768/// ```rust,no_run
769/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
770/// use serde_json::json;
771///
772/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
773/// let calculator = Tool::new(
774/// "calculator",
775/// "Performs arithmetic",
776/// json!({"type": "object"}),
777/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
778/// );
779///
780/// let mut client = Client::new(AgentOptions::builder()
781/// .model("gpt-4")
782/// .api_key("sk-...")
783/// .tools(vec![calculator])
784/// .auto_execute_tools(true) // Enable auto-execution
785/// .build()?)?;
786///
787/// client.send("What's 2+2?").await?;
788///
789/// // Tools execute automatically - you only receive final text
790/// while let Some(block) = client.receive().await? {
791/// if let ContentBlock::Text(text) = block {
792/// println!("{}", text.text); // "The result is 4."
793/// }
794/// }
795/// # Ok(())
796/// # }
797/// ```
798///
799/// ## With Interruption
800///
801/// ```rust,no_run
802/// use open_agent::{Client, AgentOptions};
803/// use std::time::Duration;
804///
805/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
806/// let mut client = Client::new(AgentOptions::default())?;
807///
808/// // Start a long-running query
809/// client.send("Write a very long story").await?;
810///
811/// // Spawn a task to interrupt after timeout
812/// let interrupt_handle = client.interrupt_handle();
813/// tokio::spawn(async move {
814/// tokio::time::sleep(Duration::from_secs(5)).await;
815/// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
816/// });
817///
818/// // This loop will stop when interrupted
819/// while let Some(block) = client.receive().await? {
820/// // Process blocks...
821/// }
822///
823/// // Client is still usable after interruption
824/// client.send("What's 2+2?").await?;
825/// # Ok(())
826/// # }
827/// ```
828pub struct Client {
829 /// Configuration options including model, API key, tools, hooks, etc.
830 ///
831 /// This field contains all the settings that control how the client behaves.
832 /// It's set once during construction and cannot be modified (though you can
833 /// access it via `options()` for inspection).
834 options: AgentOptions,
835
836 /// Complete conversation history as a sequence of messages.
837 ///
838 /// Each message contains a role (System/User/Assistant/Tool) and content blocks.
839 /// History grows unbounded by default - use `clear_history()` to reset.
840 ///
841 /// **Important**: The history includes ALL messages, not just user/assistant.
842 /// This includes tool results and intermediate assistant messages from tool calls.
843 history: Vec<Message>,
844
845 /// Currently active SSE stream being consumed.
846 ///
847 /// This is `Some(stream)` while a response is being received, and `None` when
848 /// no request is in flight or after a response completes.
849 ///
850 /// The stream is set by `send()` and consumed by `receive()`. When the stream
851 /// is exhausted, `receive()` returns `Ok(None)` and sets this back to `None`.
852 current_stream: Option<ContentStream>,
853
854 /// Reusable HTTP client for making API requests.
855 ///
856 /// Configured once during construction with the timeout from `AgentOptions`.
857 /// Reusing the same client across requests enables connection pooling and
858 /// better performance for multi-turn conversations.
859 http_client: reqwest::Client,
860
861 /// Thread-safe interrupt flag for cancellation.
862 ///
863 /// This `Arc<AtomicBool>` can be cloned and shared across threads or async tasks
864 /// to signal cancellation. When set to `true`, the next `receive()` call will
865 /// return `Ok(None)` and clear the current stream.
866 ///
867 /// The flag is automatically reset to `false` at the start of each `send()` call.
868 ///
869 /// **Thread Safety**: Can be safely accessed from multiple threads using atomic
870 /// operations. However, only one thread should call `send()`/`receive()`.
871 interrupted: Arc<AtomicBool>,
872
873 /// Buffer of content blocks for auto-execution mode.
874 ///
875 /// When `auto_execute_tools` is enabled, `receive()` internally calls the
876 /// auto-execution loop which buffers all final text blocks here. Subsequent
877 /// calls to `receive()` return blocks from this buffer one at a time.
878 ///
879 /// **Only used when `options.auto_execute_tools == true`**.
880 ///
881 /// The buffer is cleared when starting a new auto-execution loop.
882 auto_exec_buffer: Vec<ContentBlock>,
883
884 /// Current read position in the auto-execution buffer.
885 ///
886 /// Tracks which block to return next when `receive()` is called in auto mode.
887 /// Reset to 0 when the buffer is refilled with a new response.
888 ///
889 /// **Only used when `options.auto_execute_tools == true`**.
890 auto_exec_index: usize,
891}
892
893impl Client {
894 /// Creates a new client with the specified configuration.
895 ///
896 /// This constructor initializes all state fields and creates a reusable HTTP client
897 /// configured with the timeout from `AgentOptions`.
898 ///
899 /// # Parameters
900 ///
901 /// - `options`: Configuration including model, API key, tools, hooks, etc.
902 ///
903 /// # Errors
904 ///
905 /// Returns an error if the HTTP client cannot be built. This can happen due to:
906 /// - Invalid TLS configuration
907 /// - System resource exhaustion
908 /// - Invalid timeout values
909 ///
910 /// # Examples
911 ///
912 /// ```rust
913 /// use open_agent::{Client, AgentOptions};
914 ///
915 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
916 /// let client = Client::new(AgentOptions::builder()
917 /// .model("gpt-4")
918 /// .base_url("http://localhost:1234/v1")
919 /// .build()?)?;
920 /// # Ok(())
921 /// # }
922 /// ```
923 pub fn new(options: AgentOptions) -> Result<Self> {
924 // Build HTTP client with configured timeout
925 // This client is reused across all requests for connection pooling
926 let http_client = reqwest::Client::builder()
927 .timeout(Duration::from_secs(options.timeout()))
928 .build()
929 .map_err(|e| Error::config(format!("Failed to build HTTP client: {}", e)))?;
930
931 Ok(Self {
932 options,
933 history: Vec::new(), // Empty conversation history
934 current_stream: None, // No active stream yet
935 http_client,
936 interrupted: Arc::new(AtomicBool::new(false)), // Not interrupted initially
937 auto_exec_buffer: Vec::new(), // Empty buffer for auto mode
938 auto_exec_index: 0, // Start at beginning of buffer
939 })
940 }
941
942 /// Sends a user message and initiates streaming of the model's response.
943 ///
944 /// This method performs several critical steps:
945 ///
946 /// 1. Executes UserPromptSubmit hooks (which can modify or block the prompt)
947 /// 2. Adds the user message to conversation history
948 /// 3. Builds and sends HTTP request to the OpenAI-compatible API
949 /// 4. Parses the SSE stream and sets up aggregation
950 /// 5. Stores the stream for consumption via `receive()`
951 ///
952 /// # Parameters
953 ///
954 /// - `prompt`: The user's message. Can be empty to continue conversation after
955 /// adding tool results (common pattern in manual tool execution mode).
956 ///
957 /// # Returns
958 ///
959 /// - `Ok(())`: Request sent successfully, call `receive()` to get blocks
960 /// - `Err(e)`: Request failed (network error, API error, hook blocked, etc.)
961 ///
962 /// # Behavior Details
963 ///
964 /// ## Hook Execution
965 ///
966 /// Before sending, UserPromptSubmit hooks are executed. Hooks can:
967 /// - Modify the prompt text
968 /// - Block the request entirely
969 /// - Access conversation history
970 ///
971 /// If a hook blocks the request, this method returns an error immediately.
972 ///
973 /// ## History Management
974 ///
975 /// The prompt is added to history BEFORE sending the request. This ensures
976 /// that history is consistent even if the request fails.
977 ///
978 /// ## Stream Setup
979 ///
980 /// The response stream is set up but not consumed. You must call `receive()`
981 /// repeatedly to get content blocks. The stream remains active until:
982 /// - All blocks are consumed (stream naturally ends)
983 /// - An error occurs
984 /// - Interrupt is triggered
985 ///
986 /// ## Interrupt Handling
987 ///
988 /// The interrupt flag is reset to `false` at the start of this method,
989 /// allowing a fresh request after a previous interruption.
990 ///
991 /// # State Changes
992 ///
993 /// - Resets `interrupted` flag to `false`
994 /// - Appends user message to `history`
995 /// - Sets `current_stream` to new SSE stream
996 /// - Does NOT modify `auto_exec_buffer` or `auto_exec_index`
997 ///
998 /// # Errors
999 ///
1000 /// Returns errors for:
1001 /// - Hook blocking the prompt
1002 /// - HTTP client errors (network failure, DNS, etc.)
1003 /// - API errors (auth failure, invalid model, rate limits)
1004 /// - Invalid response format
1005 ///
1006 /// After an error, the client remains usable for new requests.
1007 ///
1008 /// # Examples
1009 ///
1010 /// ## Basic Usage
1011 ///
1012 /// ```rust,no_run
1013 /// # use open_agent::{Client, AgentOptions};
1014 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1015 /// # let mut client = Client::new(AgentOptions::default())?;
1016 /// client.send("Hello!").await?;
1017 ///
1018 /// while let Some(block) = client.receive().await? {
1019 /// // Process blocks...
1020 /// }
1021 /// # Ok(())
1022 /// # }
1023 /// ```
1024 ///
1025 /// ## Continuing After Tool Result
1026 ///
1027 /// ```rust,no_run
1028 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1029 /// # use serde_json::json;
1030 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1031 /// # let mut client = Client::new(AgentOptions::default())?;
1032 /// client.send("Use the calculator").await?;
1033 ///
1034 /// while let Some(block) = client.receive().await? {
1035 /// if let ContentBlock::ToolUse(tool_use) = block {
1036 /// // Execute tool and add result
1037 /// client.add_tool_result(&tool_use.id, json!({"result": 42}))?;
1038 ///
1039 /// // Continue conversation with empty prompt
1040 /// client.send("").await?;
1041 /// }
1042 /// }
1043 /// # Ok(())
1044 /// # }
1045 /// ```
1046 pub async fn send(&mut self, prompt: &str) -> Result<()> {
1047 use crate::hooks::UserPromptSubmitEvent;
1048
1049 // Reset interrupt flag for new query
1050 // This allows the client to be reused after a previous interruption
1051 // Uses SeqCst ordering to ensure visibility across all threads
1052 self.interrupted.store(false, Ordering::SeqCst);
1053
1054 // Execute UserPromptSubmit hooks
1055 // Hooks run BEFORE adding to history, allowing modification or blocking
1056 let mut final_prompt = prompt.to_string();
1057 let history_snapshot: Vec<serde_json::Value> = self
1058 .history
1059 .iter()
1060 .map(|_| serde_json::json!({})) // Simplified snapshot for hooks
1061 .collect();
1062
1063 // Create hook event with current prompt and history
1064 let event = UserPromptSubmitEvent::new(final_prompt.clone(), history_snapshot);
1065
1066 // Execute all registered UserPromptSubmit hooks
1067 if let Some(decision) = self.options.hooks().execute_user_prompt_submit(event).await {
1068 // Check if hook wants to block execution
1069 if !decision.continue_execution() {
1070 return Err(Error::other(format!(
1071 "Prompt blocked by hook: {}",
1072 decision.reason().unwrap_or("")
1073 )));
1074 }
1075 // Apply any prompt modifications from hooks
1076 if let Some(modified) = decision.modified_prompt() {
1077 final_prompt = modified.to_string();
1078 }
1079 }
1080
1081 // Add user message to history BEFORE sending request
1082 // This ensures history consistency even if request fails
1083 // Empty prompts are still added (needed for tool continuation)
1084 self.history.push(Message::user(final_prompt));
1085
1086 // Build messages array for API request
1087 // This includes system prompt + full conversation history
1088 let mut messages = Vec::new();
1089
1090 // Add system prompt as first message if configured
1091 // System prompts are added fresh for each request (not from history)
1092 if !self.options.system_prompt().is_empty() {
1093 messages.push(OpenAIMessage {
1094 role: "system".to_string(),
1095 content: self.options.system_prompt().to_string(),
1096 tool_calls: None,
1097 tool_call_id: None,
1098 });
1099 }
1100
1101 // Convert conversation history to OpenAI message format
1102 // This includes user prompts, assistant responses, and tool results
1103 for msg in &self.history {
1104 // Extract text content from all content blocks
1105 //
1106 // KNOWN LIMITATION: ToolUse and ToolResult blocks are not serialized to OpenAI format.
1107 // This works for the current streaming use case where tool calls are handled via
1108 // ContentBlock::ToolUse/ToolResult in the response stream, but means conversation
1109 // history cannot be fully round-tripped through OpenAI format. To implement proper
1110 // serialization:
1111 // 1. When processing ContentBlock::ToolUse, populate tool_calls array
1112 // 2. When processing ContentBlock::ToolResult, populate tool_call_id field
1113 // 3. Handle the different message structures (assistant with tool_calls vs tool with tool_call_id)
1114 let content = msg
1115 .content
1116 .iter()
1117 .filter_map(|block| match block {
1118 ContentBlock::Text(text) => Some(text.text.clone()),
1119 _ => None, // Skip ToolUse and ToolResult blocks (see limitation above)
1120 })
1121 .collect::<Vec<_>>()
1122 .join("\n");
1123
1124 // Convert our MessageRole enum to OpenAI role string
1125 messages.push(OpenAIMessage {
1126 role: match msg.role {
1127 MessageRole::System => "system",
1128 MessageRole::User => "user",
1129 MessageRole::Assistant => "assistant",
1130 MessageRole::Tool => "tool",
1131 }
1132 .to_string(),
1133 content,
1134 tool_calls: None, // LIMITATION: Not populated from ToolUse blocks (see above)
1135 tool_call_id: None, // LIMITATION: Not populated from ToolResult blocks (see above)
1136 });
1137 }
1138
1139 // Convert tools to OpenAI format if any are registered
1140 // Each tool is described with name, description, and JSON Schema parameters
1141 let tools = if !self.options.tools().is_empty() {
1142 Some(
1143 self.options
1144 .tools()
1145 .iter()
1146 .map(|t| t.to_openai_format())
1147 .collect(),
1148 )
1149 } else {
1150 None
1151 };
1152
1153 // Build the OpenAI-compatible request payload
1154 let request = OpenAIRequest {
1155 model: self.options.model().to_string(),
1156 messages,
1157 stream: true, // Always stream for progressive rendering
1158 max_tokens: self.options.max_tokens(),
1159 temperature: Some(self.options.temperature()),
1160 tools,
1161 };
1162
1163 // Make HTTP POST request to chat completions endpoint
1164 let url = format!("{}/chat/completions", self.options.base_url());
1165 let response = self
1166 .http_client
1167 .post(&url)
1168 .header(
1169 "Authorization",
1170 format!("Bearer {}", self.options.api_key()),
1171 )
1172 .header("Content-Type", "application/json")
1173 .json(&request)
1174 .send()
1175 .await
1176 .map_err(Error::Http)?;
1177
1178 // Check for HTTP-level errors before processing stream
1179 // This catches authentication, rate limits, invalid models, etc.
1180 if !response.status().is_success() {
1181 let status = response.status();
1182 let body = response.text().await.unwrap_or_else(|e| {
1183 eprintln!("WARNING: Failed to read error response body: {}", e);
1184 "Unknown error (failed to read response body)".to_string()
1185 });
1186 return Err(Error::api(format!("API error {}: {}", status, body)));
1187 }
1188
1189 // Parse Server-Sent Events (SSE) stream from response
1190 let sse_stream = parse_sse_stream(response);
1191
1192 // Aggregate SSE chunks into complete content blocks
1193 // ToolCallAggregator maintains state to handle incremental JSON chunks
1194 // that may arrive split across multiple SSE events
1195 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
1196 let result = match chunk_result {
1197 Ok(chunk) => match aggregator.process_chunk(chunk) {
1198 Ok(blocks) => {
1199 if blocks.is_empty() {
1200 Some(None) // Partial chunk, keep aggregating
1201 } else {
1202 Some(Some(Ok(blocks))) // Complete block(s) ready
1203 }
1204 }
1205 Err(e) => Some(Some(Err(e))), // Processing error
1206 },
1207 Err(e) => Some(Some(Err(e))), // Stream error
1208 };
1209 futures::future::ready(result)
1210 });
1211
1212 // Flatten the stream to emit individual blocks
1213 // filter_map removes None values (partial chunks)
1214 // flat_map converts Vec<ContentBlock> to individual items
1215 let flattened = stream
1216 .filter_map(|item| async move { item })
1217 .flat_map(|result| {
1218 futures::stream::iter(match result {
1219 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
1220 Err(e) => vec![Err(e)],
1221 })
1222 });
1223
1224 // Store the stream for consumption via receive()
1225 // The stream is NOT consumed here - that happens in receive()
1226 self.current_stream = Some(Box::pin(flattened));
1227
1228 Ok(())
1229 }
1230
1231 /// Internal method that returns one block from the current stream.
1232 ///
1233 /// This is the core streaming logic extracted for reuse by both manual mode
1234 /// and auto-execution mode. It handles interrupt checking and stream consumption.
1235 ///
1236 /// # Returns
1237 ///
1238 /// - `Ok(Some(block))`: Successfully received a content block
1239 /// - `Ok(None)`: Stream ended naturally or was interrupted
1240 /// - `Err(e)`: An error occurred during streaming
1241 ///
1242 /// # State Changes
1243 ///
1244 /// - Sets `current_stream` to `None` if interrupted or stream ends
1245 /// - Does not modify history or other state
1246 ///
1247 /// # Implementation Notes
1248 ///
1249 /// This method checks the interrupt flag on every call, allowing responsive
1250 /// cancellation. The check uses SeqCst ordering for immediate visibility of
1251 /// interrupts from other threads.
1252 async fn receive_one(&mut self) -> Result<Option<ContentBlock>> {
1253 // Check interrupt flag before attempting to receive
1254 // Uses SeqCst to ensure we see the latest value from any thread
1255 if self.interrupted.load(Ordering::SeqCst) {
1256 // Clear the stream and return None to signal completion
1257 self.current_stream = None;
1258 return Ok(None);
1259 }
1260
1261 // Poll the current stream if one exists
1262 if let Some(stream) = &mut self.current_stream {
1263 match stream.next().await {
1264 Some(Ok(block)) => Ok(Some(block)), // Got a block
1265 Some(Err(e)) => Err(e), // Stream error
1266 None => Ok(None), // Stream ended
1267 }
1268 } else {
1269 // No active stream
1270 Ok(None)
1271 }
1272 }
1273
1274 /// Collects all blocks from the current stream into a vector.
1275 ///
1276 /// Internal helper for auto-execution mode. This method buffers the entire
1277 /// response in memory, which is necessary to determine if the response contains
1278 /// tool calls before returning anything to the caller.
1279 ///
1280 /// # Returns
1281 ///
1282 /// - `Ok(vec)`: Successfully collected all blocks
1283 /// - `Err(e)`: Error during collection or interrupted
1284 ///
1285 /// # Memory Usage
1286 ///
1287 /// This buffers the entire response, which can be large for long completions.
1288 /// Consider the memory implications when using auto-execution mode.
1289 ///
1290 /// # Interruption
1291 ///
1292 /// Checks interrupt flag during collection and returns error if interrupted.
1293 async fn collect_all_blocks(&mut self) -> Result<Vec<ContentBlock>> {
1294 let mut blocks = Vec::new();
1295
1296 // Consume entire stream into vector
1297 while let Some(block) = self.receive_one().await? {
1298 // Check interrupt during collection for responsiveness
1299 if self.interrupted.load(Ordering::SeqCst) {
1300 self.current_stream = None;
1301 return Err(Error::other(
1302 "Operation interrupted during block collection",
1303 ));
1304 }
1305
1306 blocks.push(block);
1307 }
1308
1309 Ok(blocks)
1310 }
1311
1312 /// Executes a tool by name with the given input.
1313 ///
1314 /// Internal helper for auto-execution mode. Looks up the tool in the registered
1315 /// tools list and executes it with the provided input.
1316 ///
1317 /// # Parameters
1318 ///
1319 /// - `tool_name`: Name of the tool to execute
1320 /// - `input`: JSON value containing tool parameters
1321 ///
1322 /// # Returns
1323 ///
1324 /// - `Ok(result)`: Tool executed successfully, returns result as JSON
1325 /// - `Err(e)`: Tool not found or execution failed
1326 ///
1327 /// # Error Handling
1328 ///
1329 /// If the tool is not found in the registry, returns a ToolError.
1330 /// If execution fails, the error from the tool is propagated.
1331 async fn execute_tool_internal(
1332 &self,
1333 tool_name: &str,
1334 input: serde_json::Value,
1335 ) -> Result<serde_json::Value> {
1336 // Find tool in registered tools by name
1337 let tool = self
1338 .options
1339 .tools()
1340 .iter()
1341 .find(|t| t.name() == tool_name)
1342 .ok_or_else(|| Error::tool(format!("Tool '{}' not found", tool_name)))?;
1343
1344 // Execute the tool's async function
1345 tool.execute(input).await
1346 }
1347
1348 /// Auto-execution loop that handles tool calls automatically.
1349 ///
1350 /// This is the core implementation of automatic tool execution mode. It:
1351 ///
1352 /// 1. Collects all blocks from the current stream
1353 /// 2. Separates text blocks from tool use blocks
1354 /// 3. If there are tool blocks:
1355 /// - Executes PreToolUse hooks (can modify/block)
1356 /// - Executes each tool via its registered function
1357 /// - Executes PostToolUse hooks (can modify result)
1358 /// - Adds results to history
1359 /// - Continues conversation with send("")
1360 /// 4. Repeats until text-only response or max iterations
1361 /// 5. Returns all final text blocks
1362 ///
1363 /// # Returns
1364 ///
1365 /// - `Ok(blocks)`: Final text blocks after all tool iterations
1366 /// - `Err(e)`: Error during execution, stream processing, or interruption
1367 ///
1368 /// # Iteration Limit
1369 ///
1370 /// The loop is bounded by `options.max_tool_iterations` to prevent infinite loops.
1371 /// When the limit is reached, the loop stops and returns whatever text blocks
1372 /// have been collected so far.
1373 ///
1374 /// # Hook Integration
1375 ///
1376 /// Hooks are executed for each tool call:
1377 /// - **PreToolUse**: Can modify input or block execution entirely
1378 /// - **PostToolUse**: Can modify the result before it's added to history
1379 ///
1380 /// If a hook blocks execution, a JSON error response is used as the tool result.
1381 ///
1382 /// # State Management
1383 ///
1384 /// The loop maintains history by adding:
1385 /// - Assistant messages with text + tool use blocks
1386 /// - User messages with tool result blocks
1387 ///
1388 /// This creates a proper conversation flow that the model can follow.
1389 ///
1390 /// # Error Recovery
1391 ///
1392 /// If a tool execution fails, the error is converted to a JSON error response
1393 /// and added as the tool result. This allows the conversation to continue
1394 /// and lets the model handle the error.
1395 async fn auto_execute_loop(&mut self) -> Result<Vec<ContentBlock>> {
1396 use crate::types::ToolResultBlock;
1397
1398 // Track iterations to prevent infinite loops
1399 let mut iteration = 0;
1400 let max_iterations = self.options.max_tool_iterations();
1401
1402 loop {
1403 // ========================================================================
1404 // STEP 1: Collect all blocks from current stream
1405 // ========================================================================
1406 // Buffer the entire response to determine if it contains tool calls
1407 let blocks = self.collect_all_blocks().await?;
1408
1409 // Empty response means stream ended or was interrupted
1410 if blocks.is_empty() {
1411 return Ok(Vec::new());
1412 }
1413
1414 // ========================================================================
1415 // STEP 2: Separate text blocks from tool use blocks
1416 // ========================================================================
1417 // The model can return a mix of text and tool calls in one response
1418 let mut text_blocks = Vec::new();
1419 let mut tool_blocks = Vec::new();
1420
1421 for block in blocks {
1422 match block {
1423 ContentBlock::Text(_) => text_blocks.push(block),
1424 ContentBlock::ToolUse(_) => tool_blocks.push(block),
1425 _ => {} // Ignore ToolResult and other variants
1426 }
1427 }
1428
1429 // ========================================================================
1430 // STEP 3: Check if we're done (no tool calls)
1431 // ========================================================================
1432 // If the response contains no tool calls, we've reached the final answer
1433 if tool_blocks.is_empty() {
1434 // Add assistant's final text response to history
1435 if !text_blocks.is_empty() {
1436 let assistant_msg = Message::assistant(text_blocks.clone());
1437 self.history.push(assistant_msg);
1438 }
1439 // Return text blocks to caller via buffered receive()
1440 return Ok(text_blocks);
1441 }
1442
1443 // ========================================================================
1444 // STEP 4: Check iteration limit BEFORE executing tools
1445 // ========================================================================
1446 // Increment counter and check if we've hit the max
1447 iteration += 1;
1448 if iteration > max_iterations {
1449 // Max iterations reached - stop execution and return what we have
1450 // This prevents infinite tool-calling loops
1451 if !text_blocks.is_empty() {
1452 let assistant_msg = Message::assistant(text_blocks.clone());
1453 self.history.push(assistant_msg);
1454 }
1455 return Ok(text_blocks);
1456 }
1457
1458 // ========================================================================
1459 // STEP 5: Add assistant message to history
1460 // ========================================================================
1461 // The assistant message includes BOTH text and tool use blocks
1462 // This preserves the full context for future turns
1463 let mut all_blocks = text_blocks.clone();
1464 all_blocks.extend(tool_blocks.clone());
1465 let assistant_msg = Message::assistant(all_blocks);
1466 self.history.push(assistant_msg);
1467
1468 // ========================================================================
1469 // STEP 6: Execute all tools and collect results
1470 // ========================================================================
1471 for block in tool_blocks {
1472 if let ContentBlock::ToolUse(tool_use) = block {
1473 // Create simplified history snapshot for hooks
1474 // TODO: Full serialization of history for hooks
1475 let history_snapshot: Vec<serde_json::Value> =
1476 self.history.iter().map(|_| serde_json::json!({})).collect();
1477
1478 // ============================================================
1479 // Execute PreToolUse hooks
1480 // ============================================================
1481 use crate::hooks::PreToolUseEvent;
1482 let pre_event = PreToolUseEvent::new(
1483 tool_use.name.clone(),
1484 tool_use.input.clone(),
1485 tool_use.id.clone(),
1486 history_snapshot.clone(),
1487 );
1488
1489 // Track whether to execute and what input to use
1490 let mut tool_input = tool_use.input.clone();
1491 let mut should_execute = true;
1492 let mut block_reason = None;
1493
1494 // Execute all PreToolUse hooks
1495 if let Some(decision) =
1496 self.options.hooks().execute_pre_tool_use(pre_event).await
1497 {
1498 if !decision.continue_execution() {
1499 // Hook blocked execution
1500 should_execute = false;
1501 block_reason = decision.reason().map(|s| s.to_string());
1502 } else if let Some(modified) = decision.modified_input() {
1503 // Hook modified the input
1504 tool_input = modified.clone();
1505 }
1506 }
1507
1508 // ============================================================
1509 // Execute tool (or create error result if blocked)
1510 // ============================================================
1511 let result = if should_execute {
1512 // Actually execute the tool
1513 match self
1514 .execute_tool_internal(&tool_use.name, tool_input.clone())
1515 .await
1516 {
1517 Ok(res) => res, // Success - use the result
1518 Err(e) => {
1519 // Tool execution failed - convert to JSON error
1520 // This allows the conversation to continue
1521 serde_json::json!({
1522 "error": e.to_string(),
1523 "tool": tool_use.name,
1524 "id": tool_use.id
1525 })
1526 }
1527 }
1528 } else {
1529 // Tool blocked by PreToolUse hook - create error result
1530 serde_json::json!({
1531 "error": "Tool execution blocked by hook",
1532 "reason": block_reason.unwrap_or_else(|| "No reason provided".to_string()),
1533 "tool": tool_use.name,
1534 "id": tool_use.id
1535 })
1536 };
1537
1538 // ============================================================
1539 // Execute PostToolUse hooks
1540 // ============================================================
1541 use crate::hooks::PostToolUseEvent;
1542 let post_event = PostToolUseEvent::new(
1543 tool_use.name.clone(),
1544 tool_input,
1545 tool_use.id.clone(),
1546 result.clone(),
1547 history_snapshot,
1548 );
1549
1550 let mut final_result = result;
1551 if let Some(decision) =
1552 self.options.hooks().execute_post_tool_use(post_event).await
1553 {
1554 // PostToolUse can modify the result
1555 // Note: Uses modified_input field (naming is historical)
1556 if let Some(modified) = decision.modified_input() {
1557 final_result = modified.clone();
1558 }
1559 }
1560
1561 // ============================================================
1562 // Add tool result to history
1563 // ============================================================
1564 // Tool results are added as user messages (per OpenAI convention)
1565 let tool_result = ToolResultBlock::new(&tool_use.id, final_result);
1566 let tool_result_msg =
1567 Message::user_with_blocks(vec![ContentBlock::ToolResult(tool_result)]);
1568 self.history.push(tool_result_msg);
1569 }
1570 }
1571
1572 // ========================================================================
1573 // STEP 7: Continue conversation to get next response
1574 // ========================================================================
1575 // Send empty string to continue - the history contains all context
1576 self.send("").await?;
1577
1578 // Loop continues to collect and process the next response
1579 // This will either be more tool calls or the final text answer
1580 }
1581 }
1582
1583 /// Receives the next content block from the current stream.
1584 ///
1585 /// This is the primary method for consuming responses from the model. It works
1586 /// differently depending on the operating mode:
1587 ///
1588 /// ## Manual Mode (default)
1589 ///
1590 /// Streams blocks directly from the API response as they arrive. You receive:
1591 /// - `TextBlock`: Incremental text from the model
1592 /// - `ToolUseBlock`: Requests to execute tools
1593 /// - Other block types as they're emitted
1594 ///
1595 /// When you receive a `ToolUseBlock`, you must:
1596 /// 1. Execute the tool yourself
1597 /// 2. Call `add_tool_result()` with the result
1598 /// 3. Call `send("")` to continue the conversation
1599 ///
1600 /// ## Automatic Mode (`auto_execute_tools = true`)
1601 ///
1602 /// Transparently executes tools and only returns final text blocks. The first
1603 /// call to `receive()` triggers the auto-execution loop which:
1604 /// 1. Collects all blocks from the stream
1605 /// 2. Executes any tool calls automatically
1606 /// 3. Continues the conversation until reaching a text-only response
1607 /// 4. Buffers the final text blocks
1608 /// 5. Returns them one at a time on subsequent `receive()` calls
1609 ///
1610 /// # Returns
1611 ///
1612 /// - `Ok(Some(block))`: Successfully received a content block
1613 /// - `Ok(None)`: Stream ended normally or was interrupted
1614 /// - `Err(e)`: An error occurred during streaming or tool execution
1615 ///
1616 /// # Behavior Details
1617 ///
1618 /// ## Interruption
1619 ///
1620 /// Checks the interrupt flag on every call. If interrupted, immediately returns
1621 /// `Ok(None)` and clears the stream. The client can be reused after interruption.
1622 ///
1623 /// ## Stream Lifecycle
1624 ///
1625 /// 1. After `send()`, stream is active
1626 /// 2. Each `receive()` call yields one block
1627 /// 3. When stream ends, returns `Ok(None)`
1628 /// 4. Subsequent calls continue returning `Ok(None)` until next `send()`
1629 ///
1630 /// ## Auto-Execution Buffer
1631 ///
1632 /// In auto mode, blocks are buffered in memory. The buffer persists until
1633 /// fully consumed (index reaches length), at which point it's cleared.
1634 ///
1635 /// # State Changes
1636 ///
1637 /// - Advances stream position
1638 /// - In auto mode: May trigger entire execution loop and modify history
1639 /// - In manual mode: Only reads from stream, no history changes
1640 /// - Increments `auto_exec_index` when returning buffered blocks
1641 ///
1642 /// # Examples
1643 ///
1644 /// ## Manual Mode - Basic
1645 ///
1646 /// ```rust,no_run
1647 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1648 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1649 /// # let mut client = Client::new(AgentOptions::default())?;
1650 /// client.send("Hello!").await?;
1651 ///
1652 /// while let Some(block) = client.receive().await? {
1653 /// match block {
1654 /// ContentBlock::Text(text) => print!("{}", text.text),
1655 /// _ => {}
1656 /// }
1657 /// }
1658 /// # Ok(())
1659 /// # }
1660 /// ```
1661 ///
1662 /// ## Manual Mode - With Tools
1663 ///
1664 /// ```rust,no_run
1665 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1666 /// # use serde_json::json;
1667 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1668 /// # let mut client = Client::new(AgentOptions::default())?;
1669 /// client.send("Use the calculator").await?;
1670 ///
1671 /// while let Some(block) = client.receive().await? {
1672 /// match block {
1673 /// ContentBlock::Text(text) => {
1674 /// println!("{}", text.text);
1675 /// }
1676 /// ContentBlock::ToolUse(tool_use) => {
1677 /// println!("Executing: {}", tool_use.name);
1678 ///
1679 /// // Execute tool manually
1680 /// let result = json!({"result": 42});
1681 ///
1682 /// // Add result and continue
1683 /// client.add_tool_result(&tool_use.id, result)?;
1684 /// client.send("").await?;
1685 /// }
1686 /// _ => {}
1687 /// }
1688 /// }
1689 /// # Ok(())
1690 /// # }
1691 /// ```
1692 ///
1693 /// ## Auto Mode
1694 ///
1695 /// ```rust,no_run
1696 /// # use open_agent::{Client, AgentOptions, ContentBlock, Tool};
1697 /// # use serde_json::json;
1698 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1699 /// let mut client = Client::new(AgentOptions::builder()
1700 /// .auto_execute_tools(true)
1701 /// .build()?)?;
1702 ///
1703 /// client.send("Calculate 2+2").await?;
1704 ///
1705 /// // Tools execute automatically - you only get final text
1706 /// while let Some(block) = client.receive().await? {
1707 /// if let ContentBlock::Text(text) = block {
1708 /// println!("{}", text.text);
1709 /// }
1710 /// }
1711 /// # Ok(())
1712 /// # }
1713 /// ```
1714 ///
1715 /// ## With Error Handling
1716 ///
1717 /// ```rust,no_run
1718 /// # use open_agent::{Client, AgentOptions};
1719 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1720 /// # let mut client = Client::new(AgentOptions::default())?;
1721 /// client.send("Hello").await?;
1722 ///
1723 /// loop {
1724 /// match client.receive().await {
1725 /// Ok(Some(block)) => {
1726 /// // Process block
1727 /// }
1728 /// Ok(None) => {
1729 /// // Stream ended
1730 /// break;
1731 /// }
1732 /// Err(e) => {
1733 /// eprintln!("Error: {}", e);
1734 /// break;
1735 /// }
1736 /// }
1737 /// }
1738 /// # Ok(())
1739 /// # }
1740 /// ```
1741 pub async fn receive(&mut self) -> Result<Option<ContentBlock>> {
1742 // ========================================================================
1743 // AUTO-EXECUTION MODE
1744 // ========================================================================
1745 if self.options.auto_execute_tools() {
1746 // Check if we have buffered blocks to return
1747 // In auto mode, all final text blocks are buffered and returned one at a time
1748 if self.auto_exec_index < self.auto_exec_buffer.len() {
1749 // Return next buffered block
1750 let block = self.auto_exec_buffer[self.auto_exec_index].clone();
1751 self.auto_exec_index += 1;
1752 return Ok(Some(block));
1753 }
1754
1755 // No buffered blocks - need to run auto-execution loop
1756 // This only happens on the first receive() call after send()
1757 if self.auto_exec_buffer.is_empty() {
1758 match self.auto_execute_loop().await {
1759 Ok(blocks) => {
1760 // Buffer all final text blocks
1761 self.auto_exec_buffer = blocks;
1762 self.auto_exec_index = 0;
1763
1764 // If no blocks, return None (empty response)
1765 if self.auto_exec_buffer.is_empty() {
1766 return Ok(None);
1767 }
1768
1769 // Return first buffered block
1770 let block = self.auto_exec_buffer[0].clone();
1771 self.auto_exec_index = 1;
1772 return Ok(Some(block));
1773 }
1774 Err(e) => return Err(e),
1775 }
1776 }
1777
1778 // Buffer exhausted - return None
1779 Ok(None)
1780 } else {
1781 // ====================================================================
1782 // MANUAL MODE
1783 // ====================================================================
1784 // Stream blocks directly from API without buffering or auto-execution
1785 self.receive_one().await
1786 }
1787 }
1788
1789 /// Interrupts the current operation by setting the interrupt flag.
1790 ///
1791 /// This method provides a thread-safe way to cancel any in-progress streaming
1792 /// operation. The interrupt flag is checked by `receive()` before each block,
1793 /// allowing responsive cancellation.
1794 ///
1795 /// # Behavior
1796 ///
1797 /// - Sets the atomic interrupt flag to `true`
1798 /// - Next `receive()` call will return `Ok(None)` and clear the stream
1799 /// - Flag is automatically reset to `false` on next `send()` call
1800 /// - Safe to call from any thread (uses atomic operations)
1801 /// - Idempotent: calling multiple times has same effect as calling once
1802 /// - No-op if no operation is in progress
1803 ///
1804 /// # Thread Safety
1805 ///
1806 /// This method uses `Arc<AtomicBool>` internally, which can be safely shared
1807 /// across threads. You can clone the interrupt handle and use it from different
1808 /// threads or async tasks:
1809 ///
1810 /// ```rust,no_run
1811 /// # use open_agent::{Client, AgentOptions};
1812 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1813 /// let mut client = Client::new(AgentOptions::default())?;
1814 /// let interrupt_handle = client.interrupt_handle();
1815 ///
1816 /// // Use from another thread
1817 /// tokio::spawn(async move {
1818 /// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1819 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1820 /// });
1821 /// # Ok(())
1822 /// # }
1823 /// ```
1824 ///
1825 /// # State Changes
1826 ///
1827 /// - Sets `interrupted` flag to `true`
1828 /// - Does NOT modify stream, history, or other state directly
1829 /// - Effect takes place on next `receive()` call
1830 ///
1831 /// # Use Cases
1832 ///
1833 /// - User cancellation (e.g., stop button in UI)
1834 /// - Timeout enforcement
1835 /// - Resource cleanup
1836 /// - Emergency shutdown
1837 ///
1838 /// # Examples
1839 ///
1840 /// ## Basic Interruption
1841 ///
1842 /// ```rust,no_run
1843 /// use open_agent::{Client, AgentOptions};
1844 ///
1845 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1846 /// let mut client = Client::new(AgentOptions::default())?;
1847 ///
1848 /// client.send("Tell me a long story").await?;
1849 ///
1850 /// // Interrupt after receiving some blocks
1851 /// let mut count = 0;
1852 /// while let Some(block) = client.receive().await? {
1853 /// count += 1;
1854 /// if count >= 5 {
1855 /// client.interrupt();
1856 /// }
1857 /// }
1858 ///
1859 /// // Client is ready for new queries
1860 /// client.send("What's 2+2?").await?;
1861 /// # Ok(())
1862 /// # }
1863 /// ```
1864 ///
1865 /// ## With Timeout
1866 ///
1867 /// ```rust,no_run
1868 /// use open_agent::{Client, AgentOptions};
1869 /// use std::time::Duration;
1870 ///
1871 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1872 /// let mut client = Client::new(AgentOptions::default())?;
1873 ///
1874 /// client.send("Long request").await?;
1875 ///
1876 /// // Spawn timeout task
1877 /// let interrupt_handle = client.interrupt_handle();
1878 /// tokio::spawn(async move {
1879 /// tokio::time::sleep(Duration::from_secs(10)).await;
1880 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1881 /// });
1882 ///
1883 /// while let Some(_block) = client.receive().await? {
1884 /// // Process until timeout
1885 /// }
1886 /// # Ok(())
1887 /// # }
1888 /// ```
1889 pub fn interrupt(&self) {
1890 // Set interrupt flag using SeqCst for immediate visibility across all threads
1891 self.interrupted.store(true, Ordering::SeqCst);
1892 }
1893
1894 /// Returns a clone of the interrupt handle for thread-safe cancellation.
1895 ///
1896 /// This method provides access to the shared `Arc<AtomicBool>` interrupt flag,
1897 /// allowing it to be used from other threads or async tasks to signal cancellation.
1898 ///
1899 /// # Returns
1900 ///
1901 /// A cloned `Arc<AtomicBool>` that can be used to interrupt operations from any thread.
1902 ///
1903 /// # Examples
1904 ///
1905 /// ```rust,no_run
1906 /// # use open_agent::{Client, AgentOptions};
1907 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1908 /// let mut client = Client::new(AgentOptions::default())?;
1909 /// let interrupt_handle = client.interrupt_handle();
1910 ///
1911 /// // Use from another thread
1912 /// tokio::spawn(async move {
1913 /// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1914 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1915 /// });
1916 /// # Ok(())
1917 /// # }
1918 /// ```
1919 pub fn interrupt_handle(&self) -> Arc<AtomicBool> {
1920 self.interrupted.clone()
1921 }
1922
1923 /// Returns a reference to the conversation history.
1924 ///
1925 /// The history contains all messages exchanged in the conversation, including:
1926 /// - User messages
1927 /// - Assistant messages (with text and tool use blocks)
1928 /// - Tool result messages
1929 ///
1930 /// # Returns
1931 ///
1932 /// A slice of `Message` objects in chronological order.
1933 ///
1934 /// # Use Cases
1935 ///
1936 /// - Inspecting conversation context
1937 /// - Debugging tool execution flow
1938 /// - Saving conversation state
1939 /// - Implementing custom history management
1940 ///
1941 /// # Examples
1942 ///
1943 /// ```rust
1944 /// # use open_agent::{Client, AgentOptions};
1945 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1946 /// let client = Client::new(AgentOptions::default())?;
1947 ///
1948 /// // Initially empty
1949 /// assert_eq!(client.history().len(), 0);
1950 /// # Ok(())
1951 /// # }
1952 /// ```
1953 pub fn history(&self) -> &[Message] {
1954 &self.history
1955 }
1956
1957 /// Returns a mutable reference to the conversation history.
1958 ///
1959 /// This allows you to modify the history directly for advanced use cases like:
1960 /// - Removing old messages to manage context length
1961 /// - Editing messages for retry scenarios
1962 /// - Injecting synthetic messages for testing
1963 ///
1964 /// # Warning
1965 ///
1966 /// Modifying history directly can lead to inconsistent conversation state if not
1967 /// done carefully. The SDK expects history to follow the proper message flow
1968 /// (user → assistant → tool results → assistant, etc.).
1969 ///
1970 /// # Examples
1971 ///
1972 /// ```rust
1973 /// # use open_agent::{Client, AgentOptions};
1974 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1975 /// let mut client = Client::new(AgentOptions::default())?;
1976 ///
1977 /// // Remove oldest messages to stay within context limit
1978 /// if client.history().len() > 50 {
1979 /// client.history_mut().drain(0..10);
1980 /// }
1981 /// # Ok(())
1982 /// # }
1983 /// ```
1984 pub fn history_mut(&mut self) -> &mut Vec<Message> {
1985 &mut self.history
1986 }
1987
1988 /// Returns a reference to the agent configuration options.
1989 ///
1990 /// Provides read-only access to the `AgentOptions` used to configure this client.
1991 ///
1992 /// # Use Cases
1993 ///
1994 /// - Inspecting current configuration
1995 /// - Debugging issues
1996 /// - Conditional logic based on settings
1997 ///
1998 /// # Examples
1999 ///
2000 /// ```rust
2001 /// # use open_agent::{Client, AgentOptions};
2002 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2003 /// let client = Client::new(AgentOptions::builder()
2004 /// .model("gpt-4")
2005 /// .base_url("http://localhost:1234/v1")
2006 /// .build()?)?;
2007 ///
2008 /// println!("Using model: {}", client.options().model());
2009 /// # Ok(())
2010 /// # }
2011 /// ```
2012 pub fn options(&self) -> &AgentOptions {
2013 &self.options
2014 }
2015
2016 /// Clears all conversation history.
2017 ///
2018 /// This resets the conversation to a blank slate while preserving the client
2019 /// configuration (tools, hooks, model, etc.). The next message will start a
2020 /// fresh conversation with no prior context.
2021 ///
2022 /// # State Changes
2023 ///
2024 /// - Clears `history` vector
2025 /// - Does NOT modify current stream, options, or other state
2026 ///
2027 /// # Use Cases
2028 ///
2029 /// - Starting a new conversation
2030 /// - Preventing context length issues
2031 /// - Clearing sensitive data
2032 /// - Implementing conversation sessions
2033 ///
2034 /// # Examples
2035 ///
2036 /// ```rust,no_run
2037 /// # use open_agent::{Client, AgentOptions, ContentBlock};
2038 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2039 /// let mut client = Client::new(AgentOptions::default())?;
2040 ///
2041 /// // First conversation
2042 /// client.send("Hello").await?;
2043 /// while let Some(_) = client.receive().await? {}
2044 ///
2045 /// // Clear and start fresh
2046 /// client.clear_history();
2047 ///
2048 /// // New conversation with no memory of previous
2049 /// client.send("Hello again").await?;
2050 /// # Ok(())
2051 /// # }
2052 /// ```
2053 pub fn clear_history(&mut self) {
2054 self.history.clear();
2055 }
2056
2057 /// Adds a tool result to the conversation history for manual tool execution.
2058 ///
2059 /// This method is used exclusively in **manual mode** after receiving a `ToolUseBlock`.
2060 /// The workflow is:
2061 ///
2062 /// 1. `receive()` returns a `ToolUseBlock`
2063 /// 2. You execute the tool yourself
2064 /// 3. Call `add_tool_result()` with the tool's output
2065 /// 4. Call `send("")` to continue the conversation
2066 /// 5. The model receives the tool result and generates a response
2067 ///
2068 /// # Parameters
2069 ///
2070 /// - `tool_use_id`: The unique ID from the `ToolUseBlock` (must match exactly)
2071 /// - `content`: The tool's output as a JSON value
2072 ///
2073 /// # Behavior
2074 ///
2075 /// Creates a `ToolResultBlock` and adds it to conversation history as a tool message.
2076 /// This preserves the tool call/result pairing that the model needs to understand
2077 /// the conversation flow.
2078 ///
2079 /// # State Changes
2080 ///
2081 /// - Appends a tool message to `history`
2082 /// - Does NOT modify stream or trigger any requests
2083 ///
2084 /// # Important Notes
2085 ///
2086 /// - **Not used in auto mode**: Auto-execution handles tool results automatically
2087 /// - **ID must match**: The `tool_use_id` must match the ID from the `ToolUseBlock`
2088 /// - **No validation**: This method doesn't validate the result format
2089 /// - **Must call send()**: After adding result(s), call `send("")` to continue
2090 ///
2091 /// # Examples
2092 ///
2093 /// ## Basic Manual Tool Execution
2094 ///
2095 /// ```rust,no_run
2096 /// use open_agent::{Client, AgentOptions, ContentBlock};
2097 /// use serde_json::json;
2098 ///
2099 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2100 /// let mut client = Client::new(AgentOptions::default())?;
2101 /// client.send("Use the calculator").await?;
2102 ///
2103 /// while let Some(block) = client.receive().await? {
2104 /// match block {
2105 /// ContentBlock::ToolUse(tool_use) => {
2106 /// // Execute tool manually
2107 /// let result = json!({"result": 42});
2108 ///
2109 /// // Add result to history
2110 /// client.add_tool_result(&tool_use.id, result)?;
2111 ///
2112 /// // Continue conversation to get model's response
2113 /// client.send("").await?;
2114 /// }
2115 /// ContentBlock::Text(text) => {
2116 /// println!("{}", text.text);
2117 /// }
2118 /// _ => {}
2119 /// }
2120 /// }
2121 /// # Ok(())
2122 /// # }
2123 /// ```
2124 ///
2125 /// ## Handling Tool Errors
2126 ///
2127 /// ```rust,no_run
2128 /// use open_agent::{Client, AgentOptions, ContentBlock};
2129 /// use serde_json::json;
2130 ///
2131 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2132 /// # let mut client = Client::new(AgentOptions::default())?;
2133 /// # client.send("test").await?;
2134 /// while let Some(block) = client.receive().await? {
2135 /// if let ContentBlock::ToolUse(tool_use) = block {
2136 /// // Try to execute tool
2137 /// let result = match execute_tool(&tool_use.name, &tool_use.input) {
2138 /// Ok(output) => output,
2139 /// Err(e) => json!({
2140 /// "error": e.to_string(),
2141 /// "tool": tool_use.name
2142 /// })
2143 /// };
2144 ///
2145 /// client.add_tool_result(&tool_use.id, result)?;
2146 /// client.send("").await?;
2147 /// }
2148 /// }
2149 ///
2150 /// # fn execute_tool(name: &str, input: &serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
2151 /// # Ok(json!({}))
2152 /// # }
2153 /// # Ok(())
2154 /// # }
2155 /// ```
2156 ///
2157 /// ## Multiple Tool Calls
2158 ///
2159 /// ```rust,no_run
2160 /// use open_agent::{Client, AgentOptions, ContentBlock};
2161 /// use serde_json::json;
2162 ///
2163 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2164 /// # let mut client = Client::new(AgentOptions::default())?;
2165 /// client.send("Calculate 2+2 and 3+3").await?;
2166 ///
2167 /// let mut tool_calls = Vec::new();
2168 ///
2169 /// // Collect all tool calls
2170 /// while let Some(block) = client.receive().await? {
2171 /// if let ContentBlock::ToolUse(tool_use) = block {
2172 /// tool_calls.push(tool_use);
2173 /// }
2174 /// }
2175 ///
2176 /// // Execute and add results for all tools
2177 /// for tool_call in tool_calls {
2178 /// let result = json!({"result": 42}); // Execute tool
2179 /// client.add_tool_result(&tool_call.id, result)?;
2180 /// }
2181 ///
2182 /// // Continue conversation
2183 /// client.send("").await?;
2184 /// # Ok(())
2185 /// # }
2186 /// ```
2187 pub fn add_tool_result(&mut self, tool_use_id: &str, content: serde_json::Value) -> Result<()> {
2188 use crate::types::ToolResultBlock;
2189
2190 // Create a tool result block with the given ID and content
2191 let result_block = ToolResultBlock::new(tool_use_id, content);
2192
2193 // Add to history as a tool message
2194 // Note: Currently using a simplified representation with TextBlock
2195 // TODO: Properly serialize ToolResultBlock for OpenAI format
2196 let serialized = serde_json::to_string(&result_block.content)
2197 .map_err(|e| Error::config(format!("Failed to serialize tool result: {}", e)))?;
2198
2199 self.history.push(Message::new(
2200 MessageRole::Tool,
2201 vec![ContentBlock::Text(TextBlock::new(serialized))],
2202 ));
2203
2204 Ok(())
2205 }
2206
2207 /// Looks up a registered tool by name.
2208 ///
2209 /// This method provides access to the tool registry for manual execution scenarios.
2210 /// It searches the tools registered in `AgentOptions` and returns a reference to
2211 /// the matching tool if found.
2212 ///
2213 /// # Parameters
2214 ///
2215 /// - `name`: The tool name to search for (case-sensitive)
2216 ///
2217 /// # Returns
2218 ///
2219 /// - `Some(&Tool)`: Tool found
2220 /// - `None`: No tool with that name
2221 ///
2222 /// # Use Cases
2223 ///
2224 /// - Manual tool execution in response to `ToolUseBlock`
2225 /// - Validating tool availability before offering features
2226 /// - Inspecting tool metadata (name, description, schema)
2227 ///
2228 /// # Examples
2229 ///
2230 /// ## Execute Tool Manually
2231 ///
2232 /// ```rust,no_run
2233 /// use open_agent::{Client, AgentOptions, ContentBlock};
2234 ///
2235 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2236 /// # let mut client = Client::new(AgentOptions::default())?;
2237 /// # client.send("test").await?;
2238 /// while let Some(block) = client.receive().await? {
2239 /// if let ContentBlock::ToolUse(tool_use) = block {
2240 /// if let Some(tool) = client.get_tool(&tool_use.name) {
2241 /// // Execute the tool
2242 /// let result = tool.execute(tool_use.input.clone()).await?;
2243 /// client.add_tool_result(&tool_use.id, result)?;
2244 /// client.send("").await?;
2245 /// } else {
2246 /// println!("Unknown tool: {}", tool_use.name);
2247 /// }
2248 /// }
2249 /// }
2250 /// # Ok(())
2251 /// # }
2252 /// ```
2253 ///
2254 /// ## Check Tool Availability
2255 ///
2256 /// ```rust
2257 /// # use open_agent::{Client, AgentOptions};
2258 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2259 /// # let client = Client::new(AgentOptions::default())?;
2260 /// if client.get_tool("calculator").is_some() {
2261 /// println!("Calculator is available");
2262 /// }
2263 /// # Ok(())
2264 /// # }
2265 /// ```
2266 pub fn get_tool(&self, name: &str) -> Option<&crate::tools::Tool> {
2267 // Search registered tools by name
2268 self.options
2269 .tools()
2270 .iter()
2271 .find(|t| t.name() == name)
2272 .map(|t| t.as_ref())
2273 }
2274}
2275
2276#[cfg(test)]
2277mod tests {
2278 use super::*;
2279
2280 #[test]
2281 fn test_client_creation() {
2282 let options = AgentOptions::builder()
2283 .system_prompt("Test")
2284 .model("test-model")
2285 .base_url("http://localhost:1234/v1")
2286 .build()
2287 .unwrap();
2288
2289 let client = Client::new(options).expect("Should create client successfully");
2290 assert_eq!(client.history().len(), 0);
2291 }
2292
2293 #[test]
2294 fn test_client_new_returns_result() {
2295 // Test that Client::new() returns Result instead of panicking
2296 let options = AgentOptions::builder()
2297 .system_prompt("Test")
2298 .model("test-model")
2299 .base_url("http://localhost:1234/v1")
2300 .build()
2301 .unwrap();
2302
2303 // This should not panic - it should return Ok(client)
2304 let result = Client::new(options);
2305 assert!(result.is_ok(), "Client::new() should return Ok");
2306
2307 let client = result.unwrap();
2308 assert_eq!(client.history().len(), 0);
2309 }
2310
2311 #[test]
2312 fn test_interrupt_flag_initial_state() {
2313 let options = AgentOptions::builder()
2314 .system_prompt("Test")
2315 .model("test-model")
2316 .base_url("http://localhost:1234/v1")
2317 .build()
2318 .unwrap();
2319
2320 let client = Client::new(options).expect("Should create client successfully");
2321 // Initially not interrupted
2322 assert!(!client.interrupted.load(Ordering::SeqCst));
2323 }
2324
2325 #[test]
2326 fn test_interrupt_sets_flag() {
2327 let options = AgentOptions::builder()
2328 .system_prompt("Test")
2329 .model("test-model")
2330 .base_url("http://localhost:1234/v1")
2331 .build()
2332 .unwrap();
2333
2334 let client = Client::new(options).expect("Should create client successfully");
2335 client.interrupt();
2336 assert!(client.interrupted.load(Ordering::SeqCst));
2337 }
2338
2339 #[test]
2340 fn test_interrupt_idempotent() {
2341 let options = AgentOptions::builder()
2342 .system_prompt("Test")
2343 .model("test-model")
2344 .base_url("http://localhost:1234/v1")
2345 .build()
2346 .unwrap();
2347
2348 let client = Client::new(options).expect("Should create client successfully");
2349 client.interrupt();
2350 assert!(client.interrupted.load(Ordering::SeqCst));
2351
2352 // Call again - should still be interrupted
2353 client.interrupt();
2354 assert!(client.interrupted.load(Ordering::SeqCst));
2355 }
2356
2357 #[tokio::test]
2358 async fn test_receive_returns_none_when_interrupted() {
2359 let options = AgentOptions::builder()
2360 .system_prompt("Test")
2361 .model("test-model")
2362 .base_url("http://localhost:1234/v1")
2363 .build()
2364 .unwrap();
2365
2366 let mut client = Client::new(options).expect("Should create client successfully");
2367
2368 // Interrupt before receiving
2369 client.interrupt();
2370
2371 // NEW SIGNATURE: receive() should return Ok(None) when interrupted
2372 let result = client.receive().await;
2373 assert!(result.is_ok());
2374 assert!(result.unwrap().is_none());
2375 }
2376
2377 #[tokio::test]
2378 async fn test_receive_returns_ok_none_when_no_stream() {
2379 let options = AgentOptions::builder()
2380 .system_prompt("Test")
2381 .model("test-model")
2382 .base_url("http://localhost:1234/v1")
2383 .build()
2384 .unwrap();
2385
2386 let mut client = Client::new(options).expect("Should create client successfully");
2387
2388 // No stream started - receive() should return Ok(None)
2389 let result = client.receive().await;
2390 assert!(result.is_ok());
2391 assert!(result.unwrap().is_none());
2392 }
2393
2394 #[tokio::test]
2395 async fn test_receive_error_propagation() {
2396 // This test demonstrates that errors are wrapped in Err(), not Some(Err())
2397 // We'll verify this behavior when we have a mock stream that produces errors
2398 let options = AgentOptions::builder()
2399 .system_prompt("Test")
2400 .model("test-model")
2401 .base_url("http://localhost:1234/v1")
2402 .build()
2403 .unwrap();
2404
2405 let client = Client::new(options).expect("Should create client successfully");
2406
2407 // Signature check: receive() returns Result<Option<ContentBlock>>
2408 // This means we can use ? operator cleanly:
2409 // while let Some(block) = client.receive().await? { ... }
2410
2411 // Type assertion to ensure signature is correct
2412 let _: Result<Option<ContentBlock>> = std::future::ready(Ok(None)).await;
2413 drop(client);
2414 }
2415}