open_agent/client.rs
1//! Client for streaming queries and multi-turn conversations
2//!
3//! This module provides the core streaming client implementation for the Open Agent SDK.
4//! It handles communication with OpenAI-compatible APIs, manages conversation history,
5//! and provides two modes of operation: manual and automatic tool execution.
6//!
7//! # Architecture Overview
8//!
9//! The SDK implements a **streaming-first architecture** where all responses from the model
10//! are received as a stream of content blocks. This design enables:
11//!
12//! - **Progressive rendering**: Display text as it arrives without waiting for completion
13//! - **Real-time tool execution**: Execute tools as they're requested by the model
14//! - **Interruption support**: Cancel operations mid-stream without corrupting state
15//! - **Memory efficiency**: Process large responses without buffering everything in memory
16//!
17//! ## Two Operating Modes
18//!
19//! ### 1. Manual Tool Execution Mode (default)
20//!
21//! In manual mode, the client streams content blocks directly to the caller. When the model
22//! requests a tool, the caller receives a `ToolUseBlock`, executes the tool, adds the result
23//! using `add_tool_result()`, and continues the conversation with another `send()` call.
24//!
25//! **Use cases**: Custom tool execution logic, interactive debugging, fine-grained control
26//!
27//! ### 2. Automatic Tool Execution Mode
28//!
29//! When `auto_execute_tools` is enabled, the client automatically executes tools and continues
30//! the conversation until receiving a text-only response. The caller only receives the final
31//! text blocks after all tool iterations complete.
32//!
33//! **Use cases**: Simple agentic workflows, automated task completion, batch processing
34//!
35//! ## Request Flow
36//!
37//! ```text
38//! User sends prompt
39//! │
40//! ├─> UserPromptSubmit hook executes (can modify/block prompt)
41//! │
42//! ├─> Prompt added to history
43//! │
44//! ├─> HTTP request to OpenAI-compatible API
45//! │
46//! ├─> Response streamed as Server-Sent Events (SSE)
47//! │
48//! ├─> SSE chunks aggregated into ContentBlocks
49//! │
50//! └─> Blocks emitted to caller (or buffered for auto-execution)
51//! ```
52//!
53//! ## Tool Execution Flow
54//!
55//! ### Manual Mode:
56//! ```text
57//! receive() → ToolUseBlock
58//! │
59//! ├─> Caller executes tool
60//! │
61//! ├─> Caller calls add_tool_result()
62//! │
63//! ├─> Caller calls send("") to continue
64//! │
65//! └─> receive() → TextBlock (model's response)
66//! ```
67//!
68//! ### Auto Mode:
69//! ```text
70//! receive() triggers auto-execution loop
71//! │
72//! ├─> Collect all blocks from stream
73//! │
74//! ├─> For each ToolUseBlock:
75//! │ ├─> PreToolUse hook executes (can modify/block)
76//! │ ├─> Tool executed via Tool.execute()
77//! │ ├─> PostToolUse hook executes (can modify result)
78//! │ └─> Result added to history
79//! │
80//! ├─> Continue conversation with send("")
81//! │
82//! ├─> Repeat until text-only response or max iterations
83//! │
84//! └─> Return text blocks one-by-one via receive()
85//! ```
86//!
87//! ## State Management
88//!
89//! The client maintains several pieces of state:
90//!
91//! - **history**: Full conversation history (`Vec<Message>`)
92//! - **current_stream**: Active SSE stream being consumed (`Option<ContentStream>`)
93//! - **interrupted**: Atomic flag for cancellation (`Arc<AtomicBool>`)
94//! - **auto_exec_buffer**: Buffered blocks for auto-execution mode (`Vec<ContentBlock>`)
95//! - **auto_exec_index**: Current position in buffer (usize)
96//!
97//! ## Interruption Mechanism
98//!
99//! The interrupt system uses `Arc<AtomicBool>` to enable safe, thread-safe cancellation:
100//!
101//! ```rust,no_run
102//! # use open_agent::{Client, AgentOptions};
103//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
104//! let mut client = Client::new(AgentOptions::default())?;
105//! let handle = client.interrupt_handle(); // Clone Arc for use in other threads
106//!
107//! // In another thread or async task:
108//! tokio::spawn(async move {
109//! tokio::time::sleep(std::time::Duration::from_secs(5)).await;
110//! handle.store(true, std::sync::atomic::Ordering::SeqCst);
111//! });
112//!
113//! client.send("Long request").await?;
114//! while let Some(block) = client.receive().await? {
115//! // Will stop when interrupted
116//! }
117//! # Ok(())
118//! # }
119//! ```
120//!
121//! ## Hook Integration
122//!
123//! Hooks provide extension points throughout the request lifecycle:
124//!
125//! - **UserPromptSubmit**: Called before sending user prompt (can modify or block)
126//! - **PreToolUse**: Called before executing each tool (can modify input or block execution)
127//! - **PostToolUse**: Called after tool execution (can modify result)
128//!
129//! Hooks are only invoked in specific scenarios and have access to conversation history.
130//!
131//! ## Error Handling
132//!
133//! Errors are propagated immediately and leave the client in a valid state:
134//!
135//! - **HTTP errors**: Network failures, timeouts, connection issues
136//! - **API errors**: Invalid model, authentication failures, rate limits
137//! - **Parse errors**: Malformed SSE responses, invalid JSON
138//! - **Tool errors**: Tool execution failures (converted to JSON error responses)
139//! - **Hook errors**: Hook execution failures or blocked operations
140//!
141//! After an error, the client remains usable for new requests.
142//!
143//! # Examples
144//!
145//! ## Simple Single-Turn Query
146//!
147//! ```rust,no_run
148//! use open_agent::{query, AgentOptions};
149//! use futures::StreamExt;
150//!
151//! #[tokio::main]
152//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
153//! let options = AgentOptions::builder()
154//! .model("gpt-4")
155//! .api_key("sk-...")
156//! .build()?;
157//!
158//! let mut stream = query("What is Rust?", &options).await?;
159//!
160//! while let Some(block) = stream.next().await {
161//! if let open_agent::ContentBlock::Text(text) = block? {
162//! print!("{}", text.text);
163//! }
164//! }
165//!
166//! Ok(())
167//! }
168//! ```
169//!
170//! ## Multi-Turn Conversation
171//!
172//! ```rust,no_run
173//! use open_agent::{Client, AgentOptions, ContentBlock};
174//! use futures::StreamExt;
175//!
176//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
177//! let mut client = Client::new(AgentOptions::builder()
178//! .model("gpt-4")
179//! .api_key("sk-...")
180//! .build()?)?;
181//!
182//! // First question
183//! client.send("What's the capital of France?").await?;
184//! while let Some(block) = client.receive().await? {
185//! if let ContentBlock::Text(text) = block {
186//! println!("{}", text.text);
187//! }
188//! }
189//!
190//! // Follow-up question (history is maintained)
191//! client.send("What's its population?").await?;
192//! while let Some(block) = client.receive().await? {
193//! if let ContentBlock::Text(text) = block {
194//! println!("{}", text.text);
195//! }
196//! }
197//! # Ok(())
198//! # }
199//! ```
200//!
201//! ## Manual Tool Execution
202//!
203//! ```rust,no_run
204//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
205//! use serde_json::json;
206//!
207//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208//! let calculator = Tool::new(
209//! "calculator",
210//! "Performs arithmetic operations",
211//! json!({"type": "object", "properties": {"operation": {"type": "string"}}}),
212//! |input| Box::pin(async move {
213//! // Custom execution logic
214//! Ok(json!({"result": 42}))
215//! })
216//! );
217//!
218//! let mut client = Client::new(AgentOptions::builder()
219//! .model("gpt-4")
220//! .api_key("sk-...")
221//! .tools(vec![calculator])
222//! .build()?)?;
223//!
224//! client.send("Calculate 2+2").await?;
225//!
226//! while let Some(block) = client.receive().await? {
227//! match block {
228//! ContentBlock::ToolUse(tool_use) => {
229//! println!("Model wants to use: {}", tool_use.name);
230//!
231//! // Execute tool manually
232//! let tool = client.get_tool(&tool_use.name).unwrap();
233//! let result = tool.execute(tool_use.input).await?;
234//!
235//! // Add result and continue
236//! client.add_tool_result(&tool_use.id, result)?;
237//! client.send("").await?;
238//! }
239//! ContentBlock::Text(text) => {
240//! println!("Response: {}", text.text);
241//! }
242//! _ => {}
243//! }
244//! }
245//! # Ok(())
246//! # }
247//! ```
248//!
249//! ## Automatic Tool Execution
250//!
251//! ```rust,no_run
252//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
253//! use serde_json::json;
254//!
255//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256//! let calculator = Tool::new(
257//! "calculator",
258//! "Performs arithmetic operations",
259//! json!({"type": "object"}),
260//! |input| Box::pin(async move { Ok(json!({"result": 42})) })
261//! );
262//!
263//! let mut client = Client::new(AgentOptions::builder()
264//! .model("gpt-4")
265//! .api_key("sk-...")
266//! .tools(vec![calculator])
267//! .auto_execute_tools(true) // Enable auto-execution
268//! .max_tool_iterations(5) // Max 5 tool rounds
269//! .build()?)?;
270//!
271//! client.send("Calculate 2+2 and then multiply by 3").await?;
272//!
273//! // Tools are executed automatically - you only get final text response
274//! while let Some(block) = client.receive().await? {
275//! if let ContentBlock::Text(text) = block {
276//! println!("{}", text.text);
277//! }
278//! }
279//! # Ok(())
280//! # }
281//! ```
282//!
283//! ## With Hooks
284//!
285//! ```ignore
286//! use open_agent::{Client, AgentOptions, Hooks, HookDecision};
287//!
288//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
289//! let hooks = Hooks::new()
290//! .add_user_prompt_submit(|event| async move {
291//! // Block prompts containing certain words
292//! if event.prompt.contains("forbidden") {
293//! return Some(HookDecision::block("Forbidden word detected"));
294//! }
295//! Some(HookDecision::continue_())
296//! })
297//! .add_pre_tool_use(|event| async move {
298//! // Log all tool uses
299//! println!("Executing tool: {}", event.tool_name);
300//! Some(HookDecision::continue_())
301//! });
302//!
303//! let mut client = Client::new(AgentOptions::builder()
304//! .model("gpt-4")
305//! .base_url("http://localhost:1234/v1")
306//! .hooks(hooks)
307//! .build()?)?;
308//!
309//! // Hooks will be executed automatically
310//! client.send("Hello!").await?;
311//! # Ok(())
312//! # }
313//! ```
314
315use crate::types::{
316 AgentOptions, ContentBlock, Message, MessageRole, OpenAIFunction, OpenAIMessage,
317 OpenAIRequest, OpenAIToolCall, TextBlock,
318};
319use crate::utils::{ToolCallAggregator, parse_sse_stream};
320use crate::{Error, Result};
321use futures::stream::{Stream, StreamExt};
322use std::pin::Pin;
323use std::sync::Arc;
324use std::sync::atomic::{AtomicBool, Ordering};
325use std::time::Duration;
326
327/// A pinned, boxed stream of content blocks from the model.
328///
329/// This type alias represents an asynchronous stream that yields `ContentBlock` items.
330/// Each item is wrapped in a `Result` to handle potential errors during streaming.
331///
332/// The stream is:
333/// - **Pinned** (`Pin<Box<...>>`): Required for safe async operations and self-referential types
334/// - **Boxed**: Allows dynamic dispatch and hides the concrete stream implementation
335/// - **Send**: Can be safely transferred between threads
336///
337/// # Content Blocks
338///
339/// The stream can yield several types of content blocks:
340///
341/// - **TextBlock**: Incremental text responses from the model
342/// - **ToolUseBlock**: Requests to execute a tool with specific parameters
343/// - **ToolResultBlock**: Results from tool execution (in manual mode)
344///
345/// # Error Handling
346///
347/// Errors in the stream indicate issues like:
348/// - Network failures or timeouts
349/// - Malformed SSE events
350/// - JSON parsing errors
351/// - API errors from the model provider
352///
353/// When an error occurs, the stream typically terminates. It's the caller's responsibility
354/// to handle errors appropriately.
355///
356/// # Examples
357///
358/// ```rust,no_run
359/// use open_agent::{query, AgentOptions, ContentBlock};
360/// use futures::StreamExt;
361///
362/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
363/// let options = AgentOptions::builder()
364/// .model("gpt-4")
365/// .api_key("sk-...")
366/// .build()?;
367///
368/// let mut stream = query("Hello!", &options).await?;
369///
370/// while let Some(result) = stream.next().await {
371/// match result {
372/// Ok(ContentBlock::Text(text)) => print!("{}", text.text),
373/// Ok(_) => {}, // Other block types
374/// Err(e) => eprintln!("Stream error: {}", e),
375/// }
376/// }
377/// # Ok(())
378/// # }
379/// ```
380pub type ContentStream = Pin<Box<dyn Stream<Item = Result<ContentBlock>> + Send>>;
381
382/// Simple query function for single-turn interactions without conversation history.
383///
384/// This is a stateless convenience function for simple queries that don't require
385/// multi-turn conversations. It creates a temporary HTTP client, sends a single
386/// prompt, and returns a stream of content blocks.
387///
388/// For multi-turn conversations or more control over the interaction, use [`Client`] instead.
389///
390/// # Parameters
391///
392/// - `prompt`: The user's message to send to the model
393/// - `options`: Configuration including model, API key, tools, etc.
394///
395/// # Returns
396///
397/// Returns a `ContentStream` that yields content blocks as they arrive from the model.
398/// The stream must be polled to completion to receive all blocks.
399///
400/// # Behavior
401///
402/// 1. Creates a temporary HTTP client with configured timeout
403/// 2. Builds message array (system prompt + user prompt)
404/// 3. Converts tools to OpenAI format if provided
405/// 4. Makes HTTP POST request to `/chat/completions`
406/// 5. Parses Server-Sent Events (SSE) response stream
407/// 6. Aggregates chunks into complete content blocks
408/// 7. Returns stream that yields blocks as they complete
409///
410/// # Error Handling
411///
412/// This function can return errors for:
413/// - HTTP client creation failures
414/// - Network errors during the request
415/// - API errors (authentication, invalid model, rate limits, etc.)
416/// - SSE parsing errors
417/// - JSON deserialization errors
418///
419/// # Performance Notes
420///
421/// - Creates a new HTTP client for each call (consider using `Client` for repeated queries)
422/// - Timeout is configurable via `AgentOptions::timeout` (default: 120 seconds)
423/// - Streaming begins immediately; no buffering of the full response
424///
425/// # Examples
426///
427/// ## Basic Usage
428///
429/// ```rust,no_run
430/// use open_agent::{query, AgentOptions};
431/// use futures::StreamExt;
432///
433/// #[tokio::main]
434/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
435/// let options = AgentOptions::builder()
436/// .system_prompt("You are a helpful assistant")
437/// .model("gpt-4")
438/// .api_key("sk-...")
439/// .build()?;
440///
441/// let mut stream = query("What's the capital of France?", &options).await?;
442///
443/// while let Some(block) = stream.next().await {
444/// match block? {
445/// open_agent::ContentBlock::Text(text) => {
446/// print!("{}", text.text);
447/// }
448/// _ => {}
449/// }
450/// }
451///
452/// Ok(())
453/// }
454/// ```
455///
456/// ## With Tools
457///
458/// ```rust,no_run
459/// use open_agent::{query, AgentOptions, Tool, ContentBlock};
460/// use futures::StreamExt;
461/// use serde_json::json;
462///
463/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
464/// let calculator = Tool::new(
465/// "calculator",
466/// "Performs calculations",
467/// json!({"type": "object"}),
468/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
469/// );
470///
471/// let options = AgentOptions::builder()
472/// .model("gpt-4")
473/// .api_key("sk-...")
474/// .tools(vec![calculator])
475/// .build()?;
476///
477/// let mut stream = query("Calculate 2+2", &options).await?;
478///
479/// while let Some(block) = stream.next().await {
480/// match block? {
481/// ContentBlock::ToolUse(tool_use) => {
482/// println!("Model wants to use: {}", tool_use.name);
483/// // Note: You'll need to manually execute tools and continue
484/// // the conversation. For automatic execution, use Client.
485/// }
486/// ContentBlock::Text(text) => print!("{}", text.text),
487/// _ => {}
488/// }
489/// }
490/// # Ok(())
491/// # }
492/// ```
493///
494/// ## Error Handling
495///
496/// ```rust,no_run
497/// use open_agent::{query, AgentOptions};
498/// use futures::StreamExt;
499///
500/// # async fn example() {
501/// let options = AgentOptions::builder()
502/// .model("gpt-4")
503/// .api_key("invalid-key")
504/// .build()
505/// .unwrap();
506///
507/// match query("Hello", &options).await {
508/// Ok(mut stream) => {
509/// while let Some(result) = stream.next().await {
510/// match result {
511/// Ok(block) => println!("Block: {:?}", block),
512/// Err(e) => {
513/// eprintln!("Stream error: {}", e);
514/// break;
515/// }
516/// }
517/// }
518/// }
519/// Err(e) => eprintln!("Query failed: {}", e),
520/// }
521/// # }
522/// ```
523pub async fn query(prompt: &str, options: &AgentOptions) -> Result<ContentStream> {
524 // Create HTTP client with configured timeout
525 // The timeout applies to the entire request, not individual chunks
526 let client = reqwest::Client::builder()
527 .timeout(Duration::from_secs(options.timeout()))
528 .build()
529 .map_err(Error::Http)?;
530
531 // Build messages array for the API request
532 // OpenAI format expects an array of message objects with role and content
533 let mut messages = Vec::new();
534
535 // Add system prompt if provided
536 // System prompts set the assistant's behavior and context
537 if !options.system_prompt().is_empty() {
538 messages.push(OpenAIMessage {
539 role: "system".to_string(),
540 content: options.system_prompt().to_string(),
541 tool_calls: None,
542 tool_call_id: None,
543 });
544 }
545
546 // Add user prompt
547 // This is the actual query from the user
548 messages.push(OpenAIMessage {
549 role: "user".to_string(),
550 content: prompt.to_string(),
551 tool_calls: None,
552 tool_call_id: None,
553 });
554
555 // Convert tools to OpenAI format if any are provided
556 // Tools are described using JSON Schema for parameter validation
557 let tools = if !options.tools().is_empty() {
558 Some(
559 options
560 .tools()
561 .iter()
562 .map(|t| t.to_openai_format())
563 .collect(),
564 )
565 } else {
566 None
567 };
568
569 // Build the OpenAI-compatible request payload
570 // stream=true enables Server-Sent Events for incremental responses
571 let request = OpenAIRequest {
572 model: options.model().to_string(),
573 messages,
574 stream: true, // Critical: enables SSE streaming
575 max_tokens: options.max_tokens(),
576 temperature: Some(options.temperature()),
577 tools,
578 };
579
580 // Make HTTP POST request to the chat completions endpoint
581 let url = format!("{}/chat/completions", options.base_url());
582 let response = client
583 .post(&url)
584 .header("Authorization", format!("Bearer {}", options.api_key()))
585 .header("Content-Type", "application/json")
586 .json(&request)
587 .send()
588 .await
589 .map_err(Error::Http)?;
590
591 // Check for HTTP-level errors before processing the stream
592 // This catches authentication failures, rate limits, invalid models, etc.
593 if !response.status().is_success() {
594 let status = response.status();
595 let body = response.text().await.unwrap_or_else(|e| {
596 eprintln!("WARNING: Failed to read error response body: {}", e);
597 "Unknown error (failed to read response body)".to_string()
598 });
599 return Err(Error::api(format!("API error {}: {}", status, body)));
600 }
601
602 // Parse the Server-Sent Events (SSE) stream
603 // The response body is a stream of "data: {...}" events
604 let sse_stream = parse_sse_stream(response);
605
606 // Aggregate SSE chunks into complete content blocks
607 // ToolCallAggregator handles partial JSON and assembles complete tool calls
608 // The scan() combinator maintains state across stream items
609 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
610 let result = match chunk_result {
611 Ok(chunk) => match aggregator.process_chunk(chunk) {
612 Ok(blocks) => {
613 if blocks.is_empty() {
614 Some(None) // Intermediate chunk, continue streaming
615 } else {
616 Some(Some(Ok(blocks))) // Complete block(s) ready
617 }
618 }
619 Err(e) => Some(Some(Err(e))), // Propagate processing error
620 },
621 Err(e) => Some(Some(Err(e))), // Propagate stream error
622 };
623 futures::future::ready(result)
624 });
625
626 // Flatten the stream to emit individual blocks
627 // filter_map removes None values (incomplete chunks)
628 // flat_map expands Vec<ContentBlock> into individual items
629 let flattened = stream
630 .filter_map(|item| async move { item })
631 .flat_map(|result| {
632 futures::stream::iter(match result {
633 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
634 Err(e) => vec![Err(e)],
635 })
636 });
637
638 // Pin and box the stream for type erasure and safe async usage
639 Ok(Box::pin(flattened))
640}
641
642/// Stateful client for multi-turn conversations with automatic history management.
643///
644/// The `Client` is the primary interface for building conversational AI applications.
645/// It maintains conversation history, manages streaming responses, and provides two
646/// modes of operation: manual and automatic tool execution.
647///
648/// # State Management
649///
650/// The client maintains several pieces of state that persist across multiple turns:
651///
652/// - **Conversation History**: Complete record of all messages exchanged
653/// - **Active Stream**: Currently active SSE stream being consumed
654/// - **Interrupt Flag**: Thread-safe cancellation signal
655/// - **Auto-Execution Buffer**: Cached blocks for auto-execution mode
656///
657/// # Operating Modes
658///
659/// ## Manual Mode (default)
660///
661/// In manual mode, the client streams blocks directly to the caller. When the model
662/// requests a tool, you receive a `ToolUseBlock`, execute the tool yourself, add the
663/// result with `add_tool_result()`, and continue the conversation.
664///
665/// **Advantages**:
666/// - Full control over tool execution
667/// - Custom error handling per tool
668/// - Ability to modify tool inputs/outputs
669/// - Interactive debugging capabilities
670///
671/// ## Automatic Mode (`auto_execute_tools = true`)
672///
673/// In automatic mode, the client executes tools transparently and only returns the
674/// final text response after all tool iterations complete.
675///
676/// **Advantages**:
677/// - Simpler API for common use cases
678/// - Built-in retry logic via hooks
679/// - Automatic conversation continuation
680/// - Configurable iteration limits
681///
682/// # Thread Safety
683///
684/// The client is NOT thread-safe for concurrent use. However, the interrupt mechanism
685/// uses `Arc<AtomicBool>` which can be safely shared across threads to signal cancellation.
686///
687/// # Memory Management
688///
689/// - History grows unbounded by default (consider clearing periodically)
690/// - Streams are consumed lazily (low memory footprint during streaming)
691/// - Auto-execution buffers entire response (higher memory in auto mode)
692///
693/// # Examples
694///
695/// ## Basic Multi-Turn Conversation
696///
697/// ```rust,no_run
698/// use open_agent::{Client, AgentOptions, ContentBlock};
699///
700/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
701/// let mut client = Client::new(AgentOptions::builder()
702/// .model("gpt-4")
703/// .api_key("sk-...")
704/// .build()?)?;
705///
706/// // First question
707/// client.send("What's the capital of France?").await?;
708/// while let Some(block) = client.receive().await? {
709/// if let ContentBlock::Text(text) = block {
710/// println!("{}", text.text); // "Paris is the capital of France."
711/// }
712/// }
713///
714/// // Follow-up question - history is automatically maintained
715/// client.send("What's its population?").await?;
716/// while let Some(block) = client.receive().await? {
717/// if let ContentBlock::Text(text) = block {
718/// println!("{}", text.text); // "Paris has approximately 2.2 million people."
719/// }
720/// }
721/// # Ok(())
722/// # }
723/// ```
724///
725/// ## Manual Tool Execution
726///
727/// ```rust,no_run
728/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
729/// use serde_json::json;
730///
731/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
732/// let calculator = Tool::new(
733/// "calculator",
734/// "Performs arithmetic",
735/// json!({"type": "object"}),
736/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
737/// );
738///
739/// let mut client = Client::new(AgentOptions::builder()
740/// .model("gpt-4")
741/// .api_key("sk-...")
742/// .tools(vec![calculator])
743/// .build()?)?;
744///
745/// client.send("What's 2+2?").await?;
746///
747/// while let Some(block) = client.receive().await? {
748/// match block {
749/// ContentBlock::ToolUse(tool_use) => {
750/// // Execute tool manually
751/// let result = json!({"result": 4});
752/// client.add_tool_result(&tool_use.id, result)?;
753///
754/// // Continue conversation to get model's response
755/// client.send("").await?;
756/// }
757/// ContentBlock::Text(text) => {
758/// println!("{}", text.text); // "The result is 4."
759/// }
760/// _ => {}
761/// }
762/// }
763/// # Ok(())
764/// # }
765/// ```
766///
767/// ## Automatic Tool Execution
768///
769/// ```rust,no_run
770/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
771/// use serde_json::json;
772///
773/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
774/// let calculator = Tool::new(
775/// "calculator",
776/// "Performs arithmetic",
777/// json!({"type": "object"}),
778/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
779/// );
780///
781/// let mut client = Client::new(AgentOptions::builder()
782/// .model("gpt-4")
783/// .api_key("sk-...")
784/// .tools(vec![calculator])
785/// .auto_execute_tools(true) // Enable auto-execution
786/// .build()?)?;
787///
788/// client.send("What's 2+2?").await?;
789///
790/// // Tools execute automatically - you only receive final text
791/// while let Some(block) = client.receive().await? {
792/// if let ContentBlock::Text(text) = block {
793/// println!("{}", text.text); // "The result is 4."
794/// }
795/// }
796/// # Ok(())
797/// # }
798/// ```
799///
800/// ## With Interruption
801///
802/// ```rust,no_run
803/// use open_agent::{Client, AgentOptions};
804/// use std::time::Duration;
805///
806/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
807/// let mut client = Client::new(AgentOptions::default())?;
808///
809/// // Start a long-running query
810/// client.send("Write a very long story").await?;
811///
812/// // Spawn a task to interrupt after timeout
813/// let interrupt_handle = client.interrupt_handle();
814/// tokio::spawn(async move {
815/// tokio::time::sleep(Duration::from_secs(5)).await;
816/// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
817/// });
818///
819/// // This loop will stop when interrupted
820/// while let Some(block) = client.receive().await? {
821/// // Process blocks...
822/// }
823///
824/// // Client is still usable after interruption
825/// client.send("What's 2+2?").await?;
826/// # Ok(())
827/// # }
828/// ```
829pub struct Client {
830 /// Configuration options including model, API key, tools, hooks, etc.
831 ///
832 /// This field contains all the settings that control how the client behaves.
833 /// It's set once during construction and cannot be modified (though you can
834 /// access it via `options()` for inspection).
835 options: AgentOptions,
836
837 /// Complete conversation history as a sequence of messages.
838 ///
839 /// Each message contains a role (System/User/Assistant/Tool) and content blocks.
840 /// History grows unbounded by default - use `clear_history()` to reset.
841 ///
842 /// **Important**: The history includes ALL messages, not just user/assistant.
843 /// This includes tool results and intermediate assistant messages from tool calls.
844 history: Vec<Message>,
845
846 /// Currently active SSE stream being consumed.
847 ///
848 /// This is `Some(stream)` while a response is being received, and `None` when
849 /// no request is in flight or after a response completes.
850 ///
851 /// The stream is set by `send()` and consumed by `receive()`. When the stream
852 /// is exhausted, `receive()` returns `Ok(None)` and sets this back to `None`.
853 current_stream: Option<ContentStream>,
854
855 /// Reusable HTTP client for making API requests.
856 ///
857 /// Configured once during construction with the timeout from `AgentOptions`.
858 /// Reusing the same client across requests enables connection pooling and
859 /// better performance for multi-turn conversations.
860 http_client: reqwest::Client,
861
862 /// Thread-safe interrupt flag for cancellation.
863 ///
864 /// This `Arc<AtomicBool>` can be cloned and shared across threads or async tasks
865 /// to signal cancellation. When set to `true`, the next `receive()` call will
866 /// return `Ok(None)` and clear the current stream.
867 ///
868 /// The flag is automatically reset to `false` at the start of each `send()` call.
869 ///
870 /// **Thread Safety**: Can be safely accessed from multiple threads using atomic
871 /// operations. However, only one thread should call `send()`/`receive()`.
872 interrupted: Arc<AtomicBool>,
873
874 /// Buffer of content blocks for auto-execution mode.
875 ///
876 /// When `auto_execute_tools` is enabled, `receive()` internally calls the
877 /// auto-execution loop which buffers all final text blocks here. Subsequent
878 /// calls to `receive()` return blocks from this buffer one at a time.
879 ///
880 /// **Only used when `options.auto_execute_tools == true`**.
881 ///
882 /// The buffer is cleared when starting a new auto-execution loop.
883 auto_exec_buffer: Vec<ContentBlock>,
884
885 /// Current read position in the auto-execution buffer.
886 ///
887 /// Tracks which block to return next when `receive()` is called in auto mode.
888 /// Reset to 0 when the buffer is refilled with a new response.
889 ///
890 /// **Only used when `options.auto_execute_tools == true`**.
891 auto_exec_index: usize,
892}
893
894impl Client {
895 /// Creates a new client with the specified configuration.
896 ///
897 /// This constructor initializes all state fields and creates a reusable HTTP client
898 /// configured with the timeout from `AgentOptions`.
899 ///
900 /// # Parameters
901 ///
902 /// - `options`: Configuration including model, API key, tools, hooks, etc.
903 ///
904 /// # Errors
905 ///
906 /// Returns an error if the HTTP client cannot be built. This can happen due to:
907 /// - Invalid TLS configuration
908 /// - System resource exhaustion
909 /// - Invalid timeout values
910 ///
911 /// # Examples
912 ///
913 /// ```rust
914 /// use open_agent::{Client, AgentOptions};
915 ///
916 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
917 /// let client = Client::new(AgentOptions::builder()
918 /// .model("gpt-4")
919 /// .base_url("http://localhost:1234/v1")
920 /// .build()?)?;
921 /// # Ok(())
922 /// # }
923 /// ```
924 pub fn new(options: AgentOptions) -> Result<Self> {
925 // Build HTTP client with configured timeout
926 // This client is reused across all requests for connection pooling
927 let http_client = reqwest::Client::builder()
928 .timeout(Duration::from_secs(options.timeout()))
929 .build()
930 .map_err(|e| Error::config(format!("Failed to build HTTP client: {}", e)))?;
931
932 Ok(Self {
933 options,
934 history: Vec::new(), // Empty conversation history
935 current_stream: None, // No active stream yet
936 http_client,
937 interrupted: Arc::new(AtomicBool::new(false)), // Not interrupted initially
938 auto_exec_buffer: Vec::new(), // Empty buffer for auto mode
939 auto_exec_index: 0, // Start at beginning of buffer
940 })
941 }
942
943 /// Sends a user message and initiates streaming of the model's response.
944 ///
945 /// This method performs several critical steps:
946 ///
947 /// 1. Executes UserPromptSubmit hooks (which can modify or block the prompt)
948 /// 2. Adds the user message to conversation history
949 /// 3. Builds and sends HTTP request to the OpenAI-compatible API
950 /// 4. Parses the SSE stream and sets up aggregation
951 /// 5. Stores the stream for consumption via `receive()`
952 ///
953 /// # Parameters
954 ///
955 /// - `prompt`: The user's message. Can be empty to continue conversation after
956 /// adding tool results (common pattern in manual tool execution mode).
957 ///
958 /// # Returns
959 ///
960 /// - `Ok(())`: Request sent successfully, call `receive()` to get blocks
961 /// - `Err(e)`: Request failed (network error, API error, hook blocked, etc.)
962 ///
963 /// # Behavior Details
964 ///
965 /// ## Hook Execution
966 ///
967 /// Before sending, UserPromptSubmit hooks are executed. Hooks can:
968 /// - Modify the prompt text
969 /// - Block the request entirely
970 /// - Access conversation history
971 ///
972 /// If a hook blocks the request, this method returns an error immediately.
973 ///
974 /// ## History Management
975 ///
976 /// The prompt is added to history BEFORE sending the request. This ensures
977 /// that history is consistent even if the request fails.
978 ///
979 /// ## Stream Setup
980 ///
981 /// The response stream is set up but not consumed. You must call `receive()`
982 /// repeatedly to get content blocks. The stream remains active until:
983 /// - All blocks are consumed (stream naturally ends)
984 /// - An error occurs
985 /// - Interrupt is triggered
986 ///
987 /// ## Interrupt Handling
988 ///
989 /// The interrupt flag is reset to `false` at the start of this method,
990 /// allowing a fresh request after a previous interruption.
991 ///
992 /// # State Changes
993 ///
994 /// - Resets `interrupted` flag to `false`
995 /// - Appends user message to `history`
996 /// - Sets `current_stream` to new SSE stream
997 /// - Does NOT modify `auto_exec_buffer` or `auto_exec_index`
998 ///
999 /// # Errors
1000 ///
1001 /// Returns errors for:
1002 /// - Hook blocking the prompt
1003 /// - HTTP client errors (network failure, DNS, etc.)
1004 /// - API errors (auth failure, invalid model, rate limits)
1005 /// - Invalid response format
1006 ///
1007 /// After an error, the client remains usable for new requests.
1008 ///
1009 /// # Examples
1010 ///
1011 /// ## Basic Usage
1012 ///
1013 /// ```rust,no_run
1014 /// # use open_agent::{Client, AgentOptions};
1015 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1016 /// # let mut client = Client::new(AgentOptions::default())?;
1017 /// client.send("Hello!").await?;
1018 ///
1019 /// while let Some(block) = client.receive().await? {
1020 /// // Process blocks...
1021 /// }
1022 /// # Ok(())
1023 /// # }
1024 /// ```
1025 ///
1026 /// ## Continuing After Tool Result
1027 ///
1028 /// ```rust,no_run
1029 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1030 /// # use serde_json::json;
1031 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1032 /// # let mut client = Client::new(AgentOptions::default())?;
1033 /// client.send("Use the calculator").await?;
1034 ///
1035 /// while let Some(block) = client.receive().await? {
1036 /// if let ContentBlock::ToolUse(tool_use) = block {
1037 /// // Execute tool and add result
1038 /// client.add_tool_result(&tool_use.id, json!({"result": 42}))?;
1039 ///
1040 /// // Continue conversation with empty prompt
1041 /// client.send("").await?;
1042 /// }
1043 /// }
1044 /// # Ok(())
1045 /// # }
1046 /// ```
1047 pub async fn send(&mut self, prompt: &str) -> Result<()> {
1048 use crate::hooks::UserPromptSubmitEvent;
1049
1050 // Reset interrupt flag for new query
1051 // This allows the client to be reused after a previous interruption
1052 // Uses SeqCst ordering to ensure visibility across all threads
1053 self.interrupted.store(false, Ordering::SeqCst);
1054
1055 // Execute UserPromptSubmit hooks
1056 // Hooks run BEFORE adding to history, allowing modification or blocking
1057 let mut final_prompt = prompt.to_string();
1058 let history_snapshot: Vec<serde_json::Value> = self
1059 .history
1060 .iter()
1061 .map(|_| serde_json::json!({})) // Simplified snapshot for hooks
1062 .collect();
1063
1064 // Create hook event with current prompt and history
1065 let event = UserPromptSubmitEvent::new(final_prompt.clone(), history_snapshot);
1066
1067 // Execute all registered UserPromptSubmit hooks
1068 if let Some(decision) = self.options.hooks().execute_user_prompt_submit(event).await {
1069 // Check if hook wants to block execution
1070 if !decision.continue_execution() {
1071 return Err(Error::other(format!(
1072 "Prompt blocked by hook: {}",
1073 decision.reason().unwrap_or("")
1074 )));
1075 }
1076 // Apply any prompt modifications from hooks
1077 if let Some(modified) = decision.modified_prompt() {
1078 final_prompt = modified.to_string();
1079 }
1080 }
1081
1082 // Add user message to history BEFORE sending request
1083 // This ensures history consistency even if request fails
1084 // Empty prompts are still added (needed for tool continuation)
1085 self.history.push(Message::user(final_prompt));
1086
1087 // Build messages array for API request
1088 // This includes system prompt + full conversation history
1089 let mut messages = Vec::new();
1090
1091 // Add system prompt as first message if configured
1092 // System prompts are added fresh for each request (not from history)
1093 if !self.options.system_prompt().is_empty() {
1094 messages.push(OpenAIMessage {
1095 role: "system".to_string(),
1096 content: self.options.system_prompt().to_string(),
1097 tool_calls: None,
1098 tool_call_id: None,
1099 });
1100 }
1101
1102 // Convert conversation history to OpenAI message format
1103 // This includes user prompts, assistant responses, and tool results
1104 for msg in &self.history {
1105 // Separate blocks by type to determine message structure
1106 let mut text_blocks = Vec::new();
1107 let mut tool_use_blocks = Vec::new();
1108 let mut tool_result_blocks = Vec::new();
1109
1110 for block in &msg.content {
1111 match block {
1112 ContentBlock::Text(text) => text_blocks.push(text),
1113 ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
1114 ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
1115 }
1116 }
1117
1118 // Handle different message types based on content blocks
1119 // Case 1: Message contains tool results (should be separate tool messages)
1120 if !tool_result_blocks.is_empty() {
1121 for tool_result in tool_result_blocks {
1122 // Serialize the tool result content as JSON string
1123 let content = serde_json::to_string(&tool_result.content)
1124 .unwrap_or_else(|e| {
1125 format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
1126 });
1127
1128 messages.push(OpenAIMessage {
1129 role: "tool".to_string(),
1130 content,
1131 tool_calls: None,
1132 tool_call_id: Some(tool_result.tool_use_id.clone()),
1133 });
1134 }
1135 }
1136 // Case 2: Message contains tool use blocks (assistant with tool calls)
1137 else if !tool_use_blocks.is_empty() {
1138 // Build tool_calls array
1139 let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
1140 .iter()
1141 .map(|tool_use| {
1142 // Serialize the input as a JSON string (OpenAI API requirement)
1143 let arguments = serde_json::to_string(&tool_use.input)
1144 .unwrap_or_else(|_| "{}".to_string());
1145
1146 OpenAIToolCall {
1147 id: tool_use.id.clone(),
1148 call_type: "function".to_string(),
1149 function: OpenAIFunction {
1150 name: tool_use.name.clone(),
1151 arguments,
1152 },
1153 }
1154 })
1155 .collect();
1156
1157 // Extract any text content (some models include reasoning before tool calls)
1158 let content = text_blocks
1159 .iter()
1160 .map(|t| t.text.as_str())
1161 .collect::<Vec<_>>()
1162 .join("\n");
1163
1164 messages.push(OpenAIMessage {
1165 role: "assistant".to_string(),
1166 content,
1167 tool_calls: Some(tool_calls),
1168 tool_call_id: None,
1169 });
1170 }
1171 // Case 3: Message contains only text (normal message)
1172 else {
1173 let content = text_blocks
1174 .iter()
1175 .map(|t| t.text.as_str())
1176 .collect::<Vec<_>>()
1177 .join("\n");
1178
1179 let role_str = match msg.role {
1180 MessageRole::System => "system",
1181 MessageRole::User => "user",
1182 MessageRole::Assistant => "assistant",
1183 MessageRole::Tool => "tool",
1184 };
1185
1186 messages.push(OpenAIMessage {
1187 role: role_str.to_string(),
1188 content,
1189 tool_calls: None,
1190 tool_call_id: None,
1191 });
1192 }
1193 }
1194
1195 // Convert tools to OpenAI format if any are registered
1196 // Each tool is described with name, description, and JSON Schema parameters
1197 let tools = if !self.options.tools().is_empty() {
1198 Some(
1199 self.options
1200 .tools()
1201 .iter()
1202 .map(|t| t.to_openai_format())
1203 .collect(),
1204 )
1205 } else {
1206 None
1207 };
1208
1209 // Build the OpenAI-compatible request payload
1210 let request = OpenAIRequest {
1211 model: self.options.model().to_string(),
1212 messages,
1213 stream: true, // Always stream for progressive rendering
1214 max_tokens: self.options.max_tokens(),
1215 temperature: Some(self.options.temperature()),
1216 tools,
1217 };
1218
1219 // Make HTTP POST request to chat completions endpoint
1220 let url = format!("{}/chat/completions", self.options.base_url());
1221 let response = self
1222 .http_client
1223 .post(&url)
1224 .header(
1225 "Authorization",
1226 format!("Bearer {}", self.options.api_key()),
1227 )
1228 .header("Content-Type", "application/json")
1229 .json(&request)
1230 .send()
1231 .await
1232 .map_err(Error::Http)?;
1233
1234 // Check for HTTP-level errors before processing stream
1235 // This catches authentication, rate limits, invalid models, etc.
1236 if !response.status().is_success() {
1237 let status = response.status();
1238 let body = response.text().await.unwrap_or_else(|e| {
1239 eprintln!("WARNING: Failed to read error response body: {}", e);
1240 "Unknown error (failed to read response body)".to_string()
1241 });
1242 return Err(Error::api(format!("API error {}: {}", status, body)));
1243 }
1244
1245 // Parse Server-Sent Events (SSE) stream from response
1246 let sse_stream = parse_sse_stream(response);
1247
1248 // Aggregate SSE chunks into complete content blocks
1249 // ToolCallAggregator maintains state to handle incremental JSON chunks
1250 // that may arrive split across multiple SSE events
1251 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
1252 let result = match chunk_result {
1253 Ok(chunk) => match aggregator.process_chunk(chunk) {
1254 Ok(blocks) => {
1255 if blocks.is_empty() {
1256 Some(None) // Partial chunk, keep aggregating
1257 } else {
1258 Some(Some(Ok(blocks))) // Complete block(s) ready
1259 }
1260 }
1261 Err(e) => Some(Some(Err(e))), // Processing error
1262 },
1263 Err(e) => Some(Some(Err(e))), // Stream error
1264 };
1265 futures::future::ready(result)
1266 });
1267
1268 // Flatten the stream to emit individual blocks
1269 // filter_map removes None values (partial chunks)
1270 // flat_map converts Vec<ContentBlock> to individual items
1271 let flattened = stream
1272 .filter_map(|item| async move { item })
1273 .flat_map(|result| {
1274 futures::stream::iter(match result {
1275 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
1276 Err(e) => vec![Err(e)],
1277 })
1278 });
1279
1280 // Store the stream for consumption via receive()
1281 // The stream is NOT consumed here - that happens in receive()
1282 self.current_stream = Some(Box::pin(flattened));
1283
1284 Ok(())
1285 }
1286
1287 /// Internal method that returns one block from the current stream.
1288 ///
1289 /// This is the core streaming logic extracted for reuse by both manual mode
1290 /// and auto-execution mode. It handles interrupt checking and stream consumption.
1291 ///
1292 /// # Returns
1293 ///
1294 /// - `Ok(Some(block))`: Successfully received a content block
1295 /// - `Ok(None)`: Stream ended naturally or was interrupted
1296 /// - `Err(e)`: An error occurred during streaming
1297 ///
1298 /// # State Changes
1299 ///
1300 /// - Sets `current_stream` to `None` if interrupted or stream ends
1301 /// - Does not modify history or other state
1302 ///
1303 /// # Implementation Notes
1304 ///
1305 /// This method checks the interrupt flag on every call, allowing responsive
1306 /// cancellation. The check uses SeqCst ordering for immediate visibility of
1307 /// interrupts from other threads.
1308 async fn receive_one(&mut self) -> Result<Option<ContentBlock>> {
1309 // Check interrupt flag before attempting to receive
1310 // Uses SeqCst to ensure we see the latest value from any thread
1311 if self.interrupted.load(Ordering::SeqCst) {
1312 // Clear the stream and return None to signal completion
1313 self.current_stream = None;
1314 return Ok(None);
1315 }
1316
1317 // Poll the current stream if one exists
1318 if let Some(stream) = &mut self.current_stream {
1319 match stream.next().await {
1320 Some(Ok(block)) => Ok(Some(block)), // Got a block
1321 Some(Err(e)) => Err(e), // Stream error
1322 None => Ok(None), // Stream ended
1323 }
1324 } else {
1325 // No active stream
1326 Ok(None)
1327 }
1328 }
1329
1330 /// Collects all blocks from the current stream into a vector.
1331 ///
1332 /// Internal helper for auto-execution mode. This method buffers the entire
1333 /// response in memory, which is necessary to determine if the response contains
1334 /// tool calls before returning anything to the caller.
1335 ///
1336 /// # Returns
1337 ///
1338 /// - `Ok(vec)`: Successfully collected all blocks
1339 /// - `Err(e)`: Error during collection or interrupted
1340 ///
1341 /// # Memory Usage
1342 ///
1343 /// This buffers the entire response, which can be large for long completions.
1344 /// Consider the memory implications when using auto-execution mode.
1345 ///
1346 /// # Interruption
1347 ///
1348 /// Checks interrupt flag during collection and returns error if interrupted.
1349 async fn collect_all_blocks(&mut self) -> Result<Vec<ContentBlock>> {
1350 let mut blocks = Vec::new();
1351
1352 // Consume entire stream into vector
1353 while let Some(block) = self.receive_one().await? {
1354 // Check interrupt during collection for responsiveness
1355 if self.interrupted.load(Ordering::SeqCst) {
1356 self.current_stream = None;
1357 return Err(Error::other(
1358 "Operation interrupted during block collection",
1359 ));
1360 }
1361
1362 blocks.push(block);
1363 }
1364
1365 Ok(blocks)
1366 }
1367
1368 /// Executes a tool by name with the given input.
1369 ///
1370 /// Internal helper for auto-execution mode. Looks up the tool in the registered
1371 /// tools list and executes it with the provided input.
1372 ///
1373 /// # Parameters
1374 ///
1375 /// - `tool_name`: Name of the tool to execute
1376 /// - `input`: JSON value containing tool parameters
1377 ///
1378 /// # Returns
1379 ///
1380 /// - `Ok(result)`: Tool executed successfully, returns result as JSON
1381 /// - `Err(e)`: Tool not found or execution failed
1382 ///
1383 /// # Error Handling
1384 ///
1385 /// If the tool is not found in the registry, returns a ToolError.
1386 /// If execution fails, the error from the tool is propagated.
1387 async fn execute_tool_internal(
1388 &self,
1389 tool_name: &str,
1390 input: serde_json::Value,
1391 ) -> Result<serde_json::Value> {
1392 // Find tool in registered tools by name
1393 let tool = self
1394 .options
1395 .tools()
1396 .iter()
1397 .find(|t| t.name() == tool_name)
1398 .ok_or_else(|| Error::tool(format!("Tool '{}' not found", tool_name)))?;
1399
1400 // Execute the tool's async function
1401 tool.execute(input).await
1402 }
1403
1404 /// Auto-execution loop that handles tool calls automatically.
1405 ///
1406 /// This is the core implementation of automatic tool execution mode. It:
1407 ///
1408 /// 1. Collects all blocks from the current stream
1409 /// 2. Separates text blocks from tool use blocks
1410 /// 3. If there are tool blocks:
1411 /// - Executes PreToolUse hooks (can modify/block)
1412 /// - Executes each tool via its registered function
1413 /// - Executes PostToolUse hooks (can modify result)
1414 /// - Adds results to history
1415 /// - Continues conversation with send("")
1416 /// 4. Repeats until text-only response or max iterations
1417 /// 5. Returns all final text blocks
1418 ///
1419 /// # Returns
1420 ///
1421 /// - `Ok(blocks)`: Final text blocks after all tool iterations
1422 /// - `Err(e)`: Error during execution, stream processing, or interruption
1423 ///
1424 /// # Iteration Limit
1425 ///
1426 /// The loop is bounded by `options.max_tool_iterations` to prevent infinite loops.
1427 /// When the limit is reached, the loop stops and returns whatever text blocks
1428 /// have been collected so far.
1429 ///
1430 /// # Hook Integration
1431 ///
1432 /// Hooks are executed for each tool call:
1433 /// - **PreToolUse**: Can modify input or block execution entirely
1434 /// - **PostToolUse**: Can modify the result before it's added to history
1435 ///
1436 /// If a hook blocks execution, a JSON error response is used as the tool result.
1437 ///
1438 /// # State Management
1439 ///
1440 /// The loop maintains history by adding:
1441 /// - Assistant messages with text + tool use blocks
1442 /// - User messages with tool result blocks
1443 ///
1444 /// This creates a proper conversation flow that the model can follow.
1445 ///
1446 /// # Error Recovery
1447 ///
1448 /// If a tool execution fails, the error is converted to a JSON error response
1449 /// and added as the tool result. This allows the conversation to continue
1450 /// and lets the model handle the error.
1451 async fn auto_execute_loop(&mut self) -> Result<Vec<ContentBlock>> {
1452 use crate::types::ToolResultBlock;
1453
1454 // Track iterations to prevent infinite loops
1455 let mut iteration = 0;
1456 let max_iterations = self.options.max_tool_iterations();
1457
1458 loop {
1459 // ========================================================================
1460 // STEP 1: Collect all blocks from current stream
1461 // ========================================================================
1462 // Buffer the entire response to determine if it contains tool calls
1463 let blocks = self.collect_all_blocks().await?;
1464
1465 // Empty response means stream ended or was interrupted
1466 if blocks.is_empty() {
1467 return Ok(Vec::new());
1468 }
1469
1470 // ========================================================================
1471 // STEP 2: Separate text blocks from tool use blocks
1472 // ========================================================================
1473 // The model can return a mix of text and tool calls in one response
1474 let mut text_blocks = Vec::new();
1475 let mut tool_blocks = Vec::new();
1476
1477 for block in blocks {
1478 match block {
1479 ContentBlock::Text(_) => text_blocks.push(block),
1480 ContentBlock::ToolUse(_) => tool_blocks.push(block),
1481 _ => {} // Ignore ToolResult and other variants
1482 }
1483 }
1484
1485 // ========================================================================
1486 // STEP 3: Check if we're done (no tool calls)
1487 // ========================================================================
1488 // If the response contains no tool calls, we've reached the final answer
1489 if tool_blocks.is_empty() {
1490 // Add assistant's final text response to history
1491 if !text_blocks.is_empty() {
1492 let assistant_msg = Message::assistant(text_blocks.clone());
1493 self.history.push(assistant_msg);
1494 }
1495 // Return text blocks to caller via buffered receive()
1496 return Ok(text_blocks);
1497 }
1498
1499 // ========================================================================
1500 // STEP 4: Check iteration limit BEFORE executing tools
1501 // ========================================================================
1502 // Increment counter and check if we've hit the max
1503 iteration += 1;
1504 if iteration > max_iterations {
1505 // Max iterations reached - stop execution and return what we have
1506 // This prevents infinite tool-calling loops
1507 if !text_blocks.is_empty() {
1508 let assistant_msg = Message::assistant(text_blocks.clone());
1509 self.history.push(assistant_msg);
1510 }
1511 return Ok(text_blocks);
1512 }
1513
1514 // ========================================================================
1515 // STEP 5: Add assistant message to history
1516 // ========================================================================
1517 // The assistant message includes BOTH text and tool use blocks
1518 // This preserves the full context for future turns
1519 let mut all_blocks = text_blocks.clone();
1520 all_blocks.extend(tool_blocks.clone());
1521 let assistant_msg = Message::assistant(all_blocks);
1522 self.history.push(assistant_msg);
1523
1524 // ========================================================================
1525 // STEP 6: Execute all tools and collect results
1526 // ========================================================================
1527 for block in tool_blocks {
1528 if let ContentBlock::ToolUse(tool_use) = block {
1529 // Create simplified history snapshot for hooks
1530 // TODO: Full serialization of history for hooks
1531 let history_snapshot: Vec<serde_json::Value> =
1532 self.history.iter().map(|_| serde_json::json!({})).collect();
1533
1534 // ============================================================
1535 // Execute PreToolUse hooks
1536 // ============================================================
1537 use crate::hooks::PreToolUseEvent;
1538 let pre_event = PreToolUseEvent::new(
1539 tool_use.name.clone(),
1540 tool_use.input.clone(),
1541 tool_use.id.clone(),
1542 history_snapshot.clone(),
1543 );
1544
1545 // Track whether to execute and what input to use
1546 let mut tool_input = tool_use.input.clone();
1547 let mut should_execute = true;
1548 let mut block_reason = None;
1549
1550 // Execute all PreToolUse hooks
1551 if let Some(decision) =
1552 self.options.hooks().execute_pre_tool_use(pre_event).await
1553 {
1554 if !decision.continue_execution() {
1555 // Hook blocked execution
1556 should_execute = false;
1557 block_reason = decision.reason().map(|s| s.to_string());
1558 } else if let Some(modified) = decision.modified_input() {
1559 // Hook modified the input
1560 tool_input = modified.clone();
1561 }
1562 }
1563
1564 // ============================================================
1565 // Execute tool (or create error result if blocked)
1566 // ============================================================
1567 let result = if should_execute {
1568 // Actually execute the tool
1569 match self
1570 .execute_tool_internal(&tool_use.name, tool_input.clone())
1571 .await
1572 {
1573 Ok(res) => res, // Success - use the result
1574 Err(e) => {
1575 // Tool execution failed - convert to JSON error
1576 // This allows the conversation to continue
1577 serde_json::json!({
1578 "error": e.to_string(),
1579 "tool": tool_use.name,
1580 "id": tool_use.id
1581 })
1582 }
1583 }
1584 } else {
1585 // Tool blocked by PreToolUse hook - create error result
1586 serde_json::json!({
1587 "error": "Tool execution blocked by hook",
1588 "reason": block_reason.unwrap_or_else(|| "No reason provided".to_string()),
1589 "tool": tool_use.name,
1590 "id": tool_use.id
1591 })
1592 };
1593
1594 // ============================================================
1595 // Execute PostToolUse hooks
1596 // ============================================================
1597 use crate::hooks::PostToolUseEvent;
1598 let post_event = PostToolUseEvent::new(
1599 tool_use.name.clone(),
1600 tool_input,
1601 tool_use.id.clone(),
1602 result.clone(),
1603 history_snapshot,
1604 );
1605
1606 let mut final_result = result;
1607 if let Some(decision) =
1608 self.options.hooks().execute_post_tool_use(post_event).await
1609 {
1610 // PostToolUse can modify the result
1611 // Note: Uses modified_input field (naming is historical)
1612 if let Some(modified) = decision.modified_input() {
1613 final_result = modified.clone();
1614 }
1615 }
1616
1617 // ============================================================
1618 // Add tool result to history
1619 // ============================================================
1620 // Tool results are added as user messages (per OpenAI convention)
1621 let tool_result = ToolResultBlock::new(&tool_use.id, final_result);
1622 let tool_result_msg =
1623 Message::user_with_blocks(vec![ContentBlock::ToolResult(tool_result)]);
1624 self.history.push(tool_result_msg);
1625 }
1626 }
1627
1628 // ========================================================================
1629 // STEP 7: Continue conversation to get next response
1630 // ========================================================================
1631 // Send empty string to continue - the history contains all context
1632 self.send("").await?;
1633
1634 // Loop continues to collect and process the next response
1635 // This will either be more tool calls or the final text answer
1636 }
1637 }
1638
1639 /// Receives the next content block from the current stream.
1640 ///
1641 /// This is the primary method for consuming responses from the model. It works
1642 /// differently depending on the operating mode:
1643 ///
1644 /// ## Manual Mode (default)
1645 ///
1646 /// Streams blocks directly from the API response as they arrive. You receive:
1647 /// - `TextBlock`: Incremental text from the model
1648 /// - `ToolUseBlock`: Requests to execute tools
1649 /// - Other block types as they're emitted
1650 ///
1651 /// When you receive a `ToolUseBlock`, you must:
1652 /// 1. Execute the tool yourself
1653 /// 2. Call `add_tool_result()` with the result
1654 /// 3. Call `send("")` to continue the conversation
1655 ///
1656 /// ## Automatic Mode (`auto_execute_tools = true`)
1657 ///
1658 /// Transparently executes tools and only returns final text blocks. The first
1659 /// call to `receive()` triggers the auto-execution loop which:
1660 /// 1. Collects all blocks from the stream
1661 /// 2. Executes any tool calls automatically
1662 /// 3. Continues the conversation until reaching a text-only response
1663 /// 4. Buffers the final text blocks
1664 /// 5. Returns them one at a time on subsequent `receive()` calls
1665 ///
1666 /// # Returns
1667 ///
1668 /// - `Ok(Some(block))`: Successfully received a content block
1669 /// - `Ok(None)`: Stream ended normally or was interrupted
1670 /// - `Err(e)`: An error occurred during streaming or tool execution
1671 ///
1672 /// # Behavior Details
1673 ///
1674 /// ## Interruption
1675 ///
1676 /// Checks the interrupt flag on every call. If interrupted, immediately returns
1677 /// `Ok(None)` and clears the stream. The client can be reused after interruption.
1678 ///
1679 /// ## Stream Lifecycle
1680 ///
1681 /// 1. After `send()`, stream is active
1682 /// 2. Each `receive()` call yields one block
1683 /// 3. When stream ends, returns `Ok(None)`
1684 /// 4. Subsequent calls continue returning `Ok(None)` until next `send()`
1685 ///
1686 /// ## Auto-Execution Buffer
1687 ///
1688 /// In auto mode, blocks are buffered in memory. The buffer persists until
1689 /// fully consumed (index reaches length), at which point it's cleared.
1690 ///
1691 /// # State Changes
1692 ///
1693 /// - Advances stream position
1694 /// - In auto mode: May trigger entire execution loop and modify history
1695 /// - In manual mode: Only reads from stream, no history changes
1696 /// - Increments `auto_exec_index` when returning buffered blocks
1697 ///
1698 /// # Examples
1699 ///
1700 /// ## Manual Mode - Basic
1701 ///
1702 /// ```rust,no_run
1703 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1704 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1705 /// # let mut client = Client::new(AgentOptions::default())?;
1706 /// client.send("Hello!").await?;
1707 ///
1708 /// while let Some(block) = client.receive().await? {
1709 /// match block {
1710 /// ContentBlock::Text(text) => print!("{}", text.text),
1711 /// _ => {}
1712 /// }
1713 /// }
1714 /// # Ok(())
1715 /// # }
1716 /// ```
1717 ///
1718 /// ## Manual Mode - With Tools
1719 ///
1720 /// ```rust,no_run
1721 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1722 /// # use serde_json::json;
1723 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1724 /// # let mut client = Client::new(AgentOptions::default())?;
1725 /// client.send("Use the calculator").await?;
1726 ///
1727 /// while let Some(block) = client.receive().await? {
1728 /// match block {
1729 /// ContentBlock::Text(text) => {
1730 /// println!("{}", text.text);
1731 /// }
1732 /// ContentBlock::ToolUse(tool_use) => {
1733 /// println!("Executing: {}", tool_use.name);
1734 ///
1735 /// // Execute tool manually
1736 /// let result = json!({"result": 42});
1737 ///
1738 /// // Add result and continue
1739 /// client.add_tool_result(&tool_use.id, result)?;
1740 /// client.send("").await?;
1741 /// }
1742 /// _ => {}
1743 /// }
1744 /// }
1745 /// # Ok(())
1746 /// # }
1747 /// ```
1748 ///
1749 /// ## Auto Mode
1750 ///
1751 /// ```rust,no_run
1752 /// # use open_agent::{Client, AgentOptions, ContentBlock, Tool};
1753 /// # use serde_json::json;
1754 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1755 /// let mut client = Client::new(AgentOptions::builder()
1756 /// .auto_execute_tools(true)
1757 /// .build()?)?;
1758 ///
1759 /// client.send("Calculate 2+2").await?;
1760 ///
1761 /// // Tools execute automatically - you only get final text
1762 /// while let Some(block) = client.receive().await? {
1763 /// if let ContentBlock::Text(text) = block {
1764 /// println!("{}", text.text);
1765 /// }
1766 /// }
1767 /// # Ok(())
1768 /// # }
1769 /// ```
1770 ///
1771 /// ## With Error Handling
1772 ///
1773 /// ```rust,no_run
1774 /// # use open_agent::{Client, AgentOptions};
1775 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1776 /// # let mut client = Client::new(AgentOptions::default())?;
1777 /// client.send("Hello").await?;
1778 ///
1779 /// loop {
1780 /// match client.receive().await {
1781 /// Ok(Some(block)) => {
1782 /// // Process block
1783 /// }
1784 /// Ok(None) => {
1785 /// // Stream ended
1786 /// break;
1787 /// }
1788 /// Err(e) => {
1789 /// eprintln!("Error: {}", e);
1790 /// break;
1791 /// }
1792 /// }
1793 /// }
1794 /// # Ok(())
1795 /// # }
1796 /// ```
1797 pub async fn receive(&mut self) -> Result<Option<ContentBlock>> {
1798 // ========================================================================
1799 // AUTO-EXECUTION MODE
1800 // ========================================================================
1801 if self.options.auto_execute_tools() {
1802 // Check if we have buffered blocks to return
1803 // In auto mode, all final text blocks are buffered and returned one at a time
1804 if self.auto_exec_index < self.auto_exec_buffer.len() {
1805 // Return next buffered block
1806 let block = self.auto_exec_buffer[self.auto_exec_index].clone();
1807 self.auto_exec_index += 1;
1808 return Ok(Some(block));
1809 }
1810
1811 // No buffered blocks - need to run auto-execution loop
1812 // This only happens on the first receive() call after send()
1813 if self.auto_exec_buffer.is_empty() {
1814 match self.auto_execute_loop().await {
1815 Ok(blocks) => {
1816 // Buffer all final text blocks
1817 self.auto_exec_buffer = blocks;
1818 self.auto_exec_index = 0;
1819
1820 // If no blocks, return None (empty response)
1821 if self.auto_exec_buffer.is_empty() {
1822 return Ok(None);
1823 }
1824
1825 // Return first buffered block
1826 let block = self.auto_exec_buffer[0].clone();
1827 self.auto_exec_index = 1;
1828 return Ok(Some(block));
1829 }
1830 Err(e) => return Err(e),
1831 }
1832 }
1833
1834 // Buffer exhausted - return None
1835 Ok(None)
1836 } else {
1837 // ====================================================================
1838 // MANUAL MODE
1839 // ====================================================================
1840 // Stream blocks directly from API without buffering or auto-execution
1841 self.receive_one().await
1842 }
1843 }
1844
1845 /// Interrupts the current operation by setting the interrupt flag.
1846 ///
1847 /// This method provides a thread-safe way to cancel any in-progress streaming
1848 /// operation. The interrupt flag is checked by `receive()` before each block,
1849 /// allowing responsive cancellation.
1850 ///
1851 /// # Behavior
1852 ///
1853 /// - Sets the atomic interrupt flag to `true`
1854 /// - Next `receive()` call will return `Ok(None)` and clear the stream
1855 /// - Flag is automatically reset to `false` on next `send()` call
1856 /// - Safe to call from any thread (uses atomic operations)
1857 /// - Idempotent: calling multiple times has same effect as calling once
1858 /// - No-op if no operation is in progress
1859 ///
1860 /// # Thread Safety
1861 ///
1862 /// This method uses `Arc<AtomicBool>` internally, which can be safely shared
1863 /// across threads. You can clone the interrupt handle and use it from different
1864 /// threads or async tasks:
1865 ///
1866 /// ```rust,no_run
1867 /// # use open_agent::{Client, AgentOptions};
1868 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1869 /// let mut client = Client::new(AgentOptions::default())?;
1870 /// let interrupt_handle = client.interrupt_handle();
1871 ///
1872 /// // Use from another thread
1873 /// tokio::spawn(async move {
1874 /// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1875 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1876 /// });
1877 /// # Ok(())
1878 /// # }
1879 /// ```
1880 ///
1881 /// # State Changes
1882 ///
1883 /// - Sets `interrupted` flag to `true`
1884 /// - Does NOT modify stream, history, or other state directly
1885 /// - Effect takes place on next `receive()` call
1886 ///
1887 /// # Use Cases
1888 ///
1889 /// - User cancellation (e.g., stop button in UI)
1890 /// - Timeout enforcement
1891 /// - Resource cleanup
1892 /// - Emergency shutdown
1893 ///
1894 /// # Examples
1895 ///
1896 /// ## Basic Interruption
1897 ///
1898 /// ```rust,no_run
1899 /// use open_agent::{Client, AgentOptions};
1900 ///
1901 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1902 /// let mut client = Client::new(AgentOptions::default())?;
1903 ///
1904 /// client.send("Tell me a long story").await?;
1905 ///
1906 /// // Interrupt after receiving some blocks
1907 /// let mut count = 0;
1908 /// while let Some(block) = client.receive().await? {
1909 /// count += 1;
1910 /// if count >= 5 {
1911 /// client.interrupt();
1912 /// }
1913 /// }
1914 ///
1915 /// // Client is ready for new queries
1916 /// client.send("What's 2+2?").await?;
1917 /// # Ok(())
1918 /// # }
1919 /// ```
1920 ///
1921 /// ## With Timeout
1922 ///
1923 /// ```rust,no_run
1924 /// use open_agent::{Client, AgentOptions};
1925 /// use std::time::Duration;
1926 ///
1927 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1928 /// let mut client = Client::new(AgentOptions::default())?;
1929 ///
1930 /// client.send("Long request").await?;
1931 ///
1932 /// // Spawn timeout task
1933 /// let interrupt_handle = client.interrupt_handle();
1934 /// tokio::spawn(async move {
1935 /// tokio::time::sleep(Duration::from_secs(10)).await;
1936 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1937 /// });
1938 ///
1939 /// while let Some(_block) = client.receive().await? {
1940 /// // Process until timeout
1941 /// }
1942 /// # Ok(())
1943 /// # }
1944 /// ```
1945 pub fn interrupt(&self) {
1946 // Set interrupt flag using SeqCst for immediate visibility across all threads
1947 self.interrupted.store(true, Ordering::SeqCst);
1948 }
1949
1950 /// Returns a clone of the interrupt handle for thread-safe cancellation.
1951 ///
1952 /// This method provides access to the shared `Arc<AtomicBool>` interrupt flag,
1953 /// allowing it to be used from other threads or async tasks to signal cancellation.
1954 ///
1955 /// # Returns
1956 ///
1957 /// A cloned `Arc<AtomicBool>` that can be used to interrupt operations from any thread.
1958 ///
1959 /// # Examples
1960 ///
1961 /// ```rust,no_run
1962 /// # use open_agent::{Client, AgentOptions};
1963 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1964 /// let mut client = Client::new(AgentOptions::default())?;
1965 /// let interrupt_handle = client.interrupt_handle();
1966 ///
1967 /// // Use from another thread
1968 /// tokio::spawn(async move {
1969 /// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1970 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
1971 /// });
1972 /// # Ok(())
1973 /// # }
1974 /// ```
1975 pub fn interrupt_handle(&self) -> Arc<AtomicBool> {
1976 self.interrupted.clone()
1977 }
1978
1979 /// Returns a reference to the conversation history.
1980 ///
1981 /// The history contains all messages exchanged in the conversation, including:
1982 /// - User messages
1983 /// - Assistant messages (with text and tool use blocks)
1984 /// - Tool result messages
1985 ///
1986 /// # Returns
1987 ///
1988 /// A slice of `Message` objects in chronological order.
1989 ///
1990 /// # Use Cases
1991 ///
1992 /// - Inspecting conversation context
1993 /// - Debugging tool execution flow
1994 /// - Saving conversation state
1995 /// - Implementing custom history management
1996 ///
1997 /// # Examples
1998 ///
1999 /// ```rust
2000 /// # use open_agent::{Client, AgentOptions};
2001 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2002 /// let client = Client::new(AgentOptions::default())?;
2003 ///
2004 /// // Initially empty
2005 /// assert_eq!(client.history().len(), 0);
2006 /// # Ok(())
2007 /// # }
2008 /// ```
2009 pub fn history(&self) -> &[Message] {
2010 &self.history
2011 }
2012
2013 /// Returns a mutable reference to the conversation history.
2014 ///
2015 /// This allows you to modify the history directly for advanced use cases like:
2016 /// - Removing old messages to manage context length
2017 /// - Editing messages for retry scenarios
2018 /// - Injecting synthetic messages for testing
2019 ///
2020 /// # Warning
2021 ///
2022 /// Modifying history directly can lead to inconsistent conversation state if not
2023 /// done carefully. The SDK expects history to follow the proper message flow
2024 /// (user → assistant → tool results → assistant, etc.).
2025 ///
2026 /// # Examples
2027 ///
2028 /// ```rust
2029 /// # use open_agent::{Client, AgentOptions};
2030 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2031 /// let mut client = Client::new(AgentOptions::default())?;
2032 ///
2033 /// // Remove oldest messages to stay within context limit
2034 /// if client.history().len() > 50 {
2035 /// client.history_mut().drain(0..10);
2036 /// }
2037 /// # Ok(())
2038 /// # }
2039 /// ```
2040 pub fn history_mut(&mut self) -> &mut Vec<Message> {
2041 &mut self.history
2042 }
2043
2044 /// Returns a reference to the agent configuration options.
2045 ///
2046 /// Provides read-only access to the `AgentOptions` used to configure this client.
2047 ///
2048 /// # Use Cases
2049 ///
2050 /// - Inspecting current configuration
2051 /// - Debugging issues
2052 /// - Conditional logic based on settings
2053 ///
2054 /// # Examples
2055 ///
2056 /// ```rust
2057 /// # use open_agent::{Client, AgentOptions};
2058 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2059 /// let client = Client::new(AgentOptions::builder()
2060 /// .model("gpt-4")
2061 /// .base_url("http://localhost:1234/v1")
2062 /// .build()?)?;
2063 ///
2064 /// println!("Using model: {}", client.options().model());
2065 /// # Ok(())
2066 /// # }
2067 /// ```
2068 pub fn options(&self) -> &AgentOptions {
2069 &self.options
2070 }
2071
2072 /// Clears all conversation history.
2073 ///
2074 /// This resets the conversation to a blank slate while preserving the client
2075 /// configuration (tools, hooks, model, etc.). The next message will start a
2076 /// fresh conversation with no prior context.
2077 ///
2078 /// # State Changes
2079 ///
2080 /// - Clears `history` vector
2081 /// - Does NOT modify current stream, options, or other state
2082 ///
2083 /// # Use Cases
2084 ///
2085 /// - Starting a new conversation
2086 /// - Preventing context length issues
2087 /// - Clearing sensitive data
2088 /// - Implementing conversation sessions
2089 ///
2090 /// # Examples
2091 ///
2092 /// ```rust,no_run
2093 /// # use open_agent::{Client, AgentOptions, ContentBlock};
2094 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2095 /// let mut client = Client::new(AgentOptions::default())?;
2096 ///
2097 /// // First conversation
2098 /// client.send("Hello").await?;
2099 /// while let Some(_) = client.receive().await? {}
2100 ///
2101 /// // Clear and start fresh
2102 /// client.clear_history();
2103 ///
2104 /// // New conversation with no memory of previous
2105 /// client.send("Hello again").await?;
2106 /// # Ok(())
2107 /// # }
2108 /// ```
2109 pub fn clear_history(&mut self) {
2110 self.history.clear();
2111 }
2112
2113 /// Adds a tool result to the conversation history for manual tool execution.
2114 ///
2115 /// This method is used exclusively in **manual mode** after receiving a `ToolUseBlock`.
2116 /// The workflow is:
2117 ///
2118 /// 1. `receive()` returns a `ToolUseBlock`
2119 /// 2. You execute the tool yourself
2120 /// 3. Call `add_tool_result()` with the tool's output
2121 /// 4. Call `send("")` to continue the conversation
2122 /// 5. The model receives the tool result and generates a response
2123 ///
2124 /// # Parameters
2125 ///
2126 /// - `tool_use_id`: The unique ID from the `ToolUseBlock` (must match exactly)
2127 /// - `content`: The tool's output as a JSON value
2128 ///
2129 /// # Behavior
2130 ///
2131 /// Creates a `ToolResultBlock` and adds it to conversation history as a tool message.
2132 /// This preserves the tool call/result pairing that the model needs to understand
2133 /// the conversation flow.
2134 ///
2135 /// # State Changes
2136 ///
2137 /// - Appends a tool message to `history`
2138 /// - Does NOT modify stream or trigger any requests
2139 ///
2140 /// # Important Notes
2141 ///
2142 /// - **Not used in auto mode**: Auto-execution handles tool results automatically
2143 /// - **ID must match**: The `tool_use_id` must match the ID from the `ToolUseBlock`
2144 /// - **No validation**: This method doesn't validate the result format
2145 /// - **Must call send()**: After adding result(s), call `send("")` to continue
2146 ///
2147 /// # Examples
2148 ///
2149 /// ## Basic Manual Tool Execution
2150 ///
2151 /// ```rust,no_run
2152 /// use open_agent::{Client, AgentOptions, ContentBlock};
2153 /// use serde_json::json;
2154 ///
2155 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2156 /// let mut client = Client::new(AgentOptions::default())?;
2157 /// client.send("Use the calculator").await?;
2158 ///
2159 /// while let Some(block) = client.receive().await? {
2160 /// match block {
2161 /// ContentBlock::ToolUse(tool_use) => {
2162 /// // Execute tool manually
2163 /// let result = json!({"result": 42});
2164 ///
2165 /// // Add result to history
2166 /// client.add_tool_result(&tool_use.id, result)?;
2167 ///
2168 /// // Continue conversation to get model's response
2169 /// client.send("").await?;
2170 /// }
2171 /// ContentBlock::Text(text) => {
2172 /// println!("{}", text.text);
2173 /// }
2174 /// _ => {}
2175 /// }
2176 /// }
2177 /// # Ok(())
2178 /// # }
2179 /// ```
2180 ///
2181 /// ## Handling Tool Errors
2182 ///
2183 /// ```rust,no_run
2184 /// use open_agent::{Client, AgentOptions, ContentBlock};
2185 /// use serde_json::json;
2186 ///
2187 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2188 /// # let mut client = Client::new(AgentOptions::default())?;
2189 /// # client.send("test").await?;
2190 /// while let Some(block) = client.receive().await? {
2191 /// if let ContentBlock::ToolUse(tool_use) = block {
2192 /// // Try to execute tool
2193 /// let result = match execute_tool(&tool_use.name, &tool_use.input) {
2194 /// Ok(output) => output,
2195 /// Err(e) => json!({
2196 /// "error": e.to_string(),
2197 /// "tool": tool_use.name
2198 /// })
2199 /// };
2200 ///
2201 /// client.add_tool_result(&tool_use.id, result)?;
2202 /// client.send("").await?;
2203 /// }
2204 /// }
2205 ///
2206 /// # fn execute_tool(name: &str, input: &serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
2207 /// # Ok(json!({}))
2208 /// # }
2209 /// # Ok(())
2210 /// # }
2211 /// ```
2212 ///
2213 /// ## Multiple Tool Calls
2214 ///
2215 /// ```rust,no_run
2216 /// use open_agent::{Client, AgentOptions, ContentBlock};
2217 /// use serde_json::json;
2218 ///
2219 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2220 /// # let mut client = Client::new(AgentOptions::default())?;
2221 /// client.send("Calculate 2+2 and 3+3").await?;
2222 ///
2223 /// let mut tool_calls = Vec::new();
2224 ///
2225 /// // Collect all tool calls
2226 /// while let Some(block) = client.receive().await? {
2227 /// if let ContentBlock::ToolUse(tool_use) = block {
2228 /// tool_calls.push(tool_use);
2229 /// }
2230 /// }
2231 ///
2232 /// // Execute and add results for all tools
2233 /// for tool_call in tool_calls {
2234 /// let result = json!({"result": 42}); // Execute tool
2235 /// client.add_tool_result(&tool_call.id, result)?;
2236 /// }
2237 ///
2238 /// // Continue conversation
2239 /// client.send("").await?;
2240 /// # Ok(())
2241 /// # }
2242 /// ```
2243 pub fn add_tool_result(&mut self, tool_use_id: &str, content: serde_json::Value) -> Result<()> {
2244 use crate::types::ToolResultBlock;
2245
2246 // Create a tool result block with the given ID and content
2247 let result_block = ToolResultBlock::new(tool_use_id, content);
2248
2249 // Add to history as a tool message
2250 // Note: Currently using a simplified representation with TextBlock
2251 // TODO: Properly serialize ToolResultBlock for OpenAI format
2252 let serialized = serde_json::to_string(&result_block.content)
2253 .map_err(|e| Error::config(format!("Failed to serialize tool result: {}", e)))?;
2254
2255 self.history.push(Message::new(
2256 MessageRole::Tool,
2257 vec![ContentBlock::Text(TextBlock::new(serialized))],
2258 ));
2259
2260 Ok(())
2261 }
2262
2263 /// Looks up a registered tool by name.
2264 ///
2265 /// This method provides access to the tool registry for manual execution scenarios.
2266 /// It searches the tools registered in `AgentOptions` and returns a reference to
2267 /// the matching tool if found.
2268 ///
2269 /// # Parameters
2270 ///
2271 /// - `name`: The tool name to search for (case-sensitive)
2272 ///
2273 /// # Returns
2274 ///
2275 /// - `Some(&Tool)`: Tool found
2276 /// - `None`: No tool with that name
2277 ///
2278 /// # Use Cases
2279 ///
2280 /// - Manual tool execution in response to `ToolUseBlock`
2281 /// - Validating tool availability before offering features
2282 /// - Inspecting tool metadata (name, description, schema)
2283 ///
2284 /// # Examples
2285 ///
2286 /// ## Execute Tool Manually
2287 ///
2288 /// ```rust,no_run
2289 /// use open_agent::{Client, AgentOptions, ContentBlock};
2290 ///
2291 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2292 /// # let mut client = Client::new(AgentOptions::default())?;
2293 /// # client.send("test").await?;
2294 /// while let Some(block) = client.receive().await? {
2295 /// if let ContentBlock::ToolUse(tool_use) = block {
2296 /// if let Some(tool) = client.get_tool(&tool_use.name) {
2297 /// // Execute the tool
2298 /// let result = tool.execute(tool_use.input.clone()).await?;
2299 /// client.add_tool_result(&tool_use.id, result)?;
2300 /// client.send("").await?;
2301 /// } else {
2302 /// println!("Unknown tool: {}", tool_use.name);
2303 /// }
2304 /// }
2305 /// }
2306 /// # Ok(())
2307 /// # }
2308 /// ```
2309 ///
2310 /// ## Check Tool Availability
2311 ///
2312 /// ```rust
2313 /// # use open_agent::{Client, AgentOptions};
2314 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2315 /// # let client = Client::new(AgentOptions::default())?;
2316 /// if client.get_tool("calculator").is_some() {
2317 /// println!("Calculator is available");
2318 /// }
2319 /// # Ok(())
2320 /// # }
2321 /// ```
2322 pub fn get_tool(&self, name: &str) -> Option<&crate::tools::Tool> {
2323 // Search registered tools by name
2324 self.options
2325 .tools()
2326 .iter()
2327 .find(|t| t.name() == name)
2328 .map(|t| t.as_ref())
2329 }
2330}
2331
2332#[cfg(test)]
2333mod tests {
2334 use super::*;
2335
2336 #[test]
2337 fn test_client_creation() {
2338 let options = AgentOptions::builder()
2339 .system_prompt("Test")
2340 .model("test-model")
2341 .base_url("http://localhost:1234/v1")
2342 .build()
2343 .unwrap();
2344
2345 let client = Client::new(options).expect("Should create client successfully");
2346 assert_eq!(client.history().len(), 0);
2347 }
2348
2349 #[test]
2350 fn test_client_new_returns_result() {
2351 // Test that Client::new() returns Result instead of panicking
2352 let options = AgentOptions::builder()
2353 .system_prompt("Test")
2354 .model("test-model")
2355 .base_url("http://localhost:1234/v1")
2356 .build()
2357 .unwrap();
2358
2359 // This should not panic - it should return Ok(client)
2360 let result = Client::new(options);
2361 assert!(result.is_ok(), "Client::new() should return Ok");
2362
2363 let client = result.unwrap();
2364 assert_eq!(client.history().len(), 0);
2365 }
2366
2367 #[test]
2368 fn test_interrupt_flag_initial_state() {
2369 let options = AgentOptions::builder()
2370 .system_prompt("Test")
2371 .model("test-model")
2372 .base_url("http://localhost:1234/v1")
2373 .build()
2374 .unwrap();
2375
2376 let client = Client::new(options).expect("Should create client successfully");
2377 // Initially not interrupted
2378 assert!(!client.interrupted.load(Ordering::SeqCst));
2379 }
2380
2381 #[test]
2382 fn test_interrupt_sets_flag() {
2383 let options = AgentOptions::builder()
2384 .system_prompt("Test")
2385 .model("test-model")
2386 .base_url("http://localhost:1234/v1")
2387 .build()
2388 .unwrap();
2389
2390 let client = Client::new(options).expect("Should create client successfully");
2391 client.interrupt();
2392 assert!(client.interrupted.load(Ordering::SeqCst));
2393 }
2394
2395 #[test]
2396 fn test_interrupt_idempotent() {
2397 let options = AgentOptions::builder()
2398 .system_prompt("Test")
2399 .model("test-model")
2400 .base_url("http://localhost:1234/v1")
2401 .build()
2402 .unwrap();
2403
2404 let client = Client::new(options).expect("Should create client successfully");
2405 client.interrupt();
2406 assert!(client.interrupted.load(Ordering::SeqCst));
2407
2408 // Call again - should still be interrupted
2409 client.interrupt();
2410 assert!(client.interrupted.load(Ordering::SeqCst));
2411 }
2412
2413 #[tokio::test]
2414 async fn test_receive_returns_none_when_interrupted() {
2415 let options = AgentOptions::builder()
2416 .system_prompt("Test")
2417 .model("test-model")
2418 .base_url("http://localhost:1234/v1")
2419 .build()
2420 .unwrap();
2421
2422 let mut client = Client::new(options).expect("Should create client successfully");
2423
2424 // Interrupt before receiving
2425 client.interrupt();
2426
2427 // NEW SIGNATURE: receive() should return Ok(None) when interrupted
2428 let result = client.receive().await;
2429 assert!(result.is_ok());
2430 assert!(result.unwrap().is_none());
2431 }
2432
2433 #[tokio::test]
2434 async fn test_receive_returns_ok_none_when_no_stream() {
2435 let options = AgentOptions::builder()
2436 .system_prompt("Test")
2437 .model("test-model")
2438 .base_url("http://localhost:1234/v1")
2439 .build()
2440 .unwrap();
2441
2442 let mut client = Client::new(options).expect("Should create client successfully");
2443
2444 // No stream started - receive() should return Ok(None)
2445 let result = client.receive().await;
2446 assert!(result.is_ok());
2447 assert!(result.unwrap().is_none());
2448 }
2449
2450 #[tokio::test]
2451 async fn test_receive_error_propagation() {
2452 // This test demonstrates that errors are wrapped in Err(), not Some(Err())
2453 // We'll verify this behavior when we have a mock stream that produces errors
2454 let options = AgentOptions::builder()
2455 .system_prompt("Test")
2456 .model("test-model")
2457 .base_url("http://localhost:1234/v1")
2458 .build()
2459 .unwrap();
2460
2461 let client = Client::new(options).expect("Should create client successfully");
2462
2463 // Signature check: receive() returns Result<Option<ContentBlock>>
2464 // This means we can use ? operator cleanly:
2465 // while let Some(block) = client.receive().await? { ... }
2466
2467 // Type assertion to ensure signature is correct
2468 let _: Result<Option<ContentBlock>> = std::future::ready(Ok(None)).await;
2469 drop(client);
2470 }
2471}