open_agent/client.rs
1//! Client for streaming queries and multi-turn conversations
2//!
3//! This module provides the core streaming client implementation for the Open Agent SDK.
4//! It handles communication with OpenAI-compatible APIs, manages conversation history,
5//! and provides two modes of operation: manual and automatic tool execution.
6//!
7//! # Architecture Overview
8//!
9//! The SDK implements a **streaming-first architecture** where all responses from the model
10//! are received as a stream of content blocks. This design enables:
11//!
12//! - **Progressive rendering**: Display text as it arrives without waiting for completion
13//! - **Real-time tool execution**: Execute tools as they're requested by the model
14//! - **Interruption support**: Cancel operations mid-stream without corrupting state
15//! - **Memory efficiency**: Process large responses without buffering everything in memory
16//!
17//! ## Two Operating Modes
18//!
19//! ### 1. Manual Tool Execution Mode (default)
20//!
21//! In manual mode, the client streams content blocks directly to the caller. When the model
22//! requests a tool, the caller receives a `ToolUseBlock`, executes the tool, adds the result
23//! using `add_tool_result()`, and continues the conversation with another `send()` call.
24//!
25//! **Use cases**: Custom tool execution logic, interactive debugging, fine-grained control
26//!
27//! ### 2. Automatic Tool Execution Mode
28//!
29//! When `auto_execute_tools` is enabled, the client automatically executes tools and continues
30//! the conversation until receiving a text-only response. The caller only receives the final
31//! text blocks after all tool iterations complete.
32//!
33//! **Use cases**: Simple agentic workflows, automated task completion, batch processing
34//!
35//! ## Request Flow
36//!
37//! ```text
38//! User sends prompt
39//! │
40//! ├─> UserPromptSubmit hook executes (can modify/block prompt)
41//! │
42//! ├─> Prompt added to history
43//! │
44//! ├─> HTTP request to OpenAI-compatible API
45//! │
46//! ├─> Response streamed as Server-Sent Events (SSE)
47//! │
48//! ├─> SSE chunks aggregated into ContentBlocks
49//! │
50//! └─> Blocks emitted to caller (or buffered for auto-execution)
51//! ```
52//!
53//! ## Tool Execution Flow
54//!
55//! ### Manual Mode:
56//! ```text
57//! receive() → ToolUseBlock
58//! │
59//! ├─> Caller executes tool
60//! │
61//! ├─> Caller calls add_tool_result()
62//! │
63//! ├─> Caller calls send("") to continue
64//! │
65//! └─> receive() → TextBlock (model's response)
66//! ```
67//!
68//! ### Auto Mode:
69//! ```text
70//! receive() triggers auto-execution loop
71//! │
72//! ├─> Collect all blocks from stream
73//! │
74//! ├─> For each ToolUseBlock:
75//! │ ├─> PreToolUse hook executes (can modify/block)
76//! │ ├─> Tool executed via Tool.execute()
77//! │ ├─> PostToolUse hook executes (can modify result)
78//! │ └─> Result added to history
79//! │
80//! ├─> Continue conversation with send("")
81//! │
82//! ├─> Repeat until text-only response or max iterations
83//! │
84//! └─> Return text blocks one-by-one via receive()
85//! ```
86//!
87//! ## State Management
88//!
89//! The client maintains several pieces of state:
90//!
91//! - **history**: Full conversation history (`Vec<Message>`)
92//! - **current_stream**: Active SSE stream being consumed (`Option<ContentStream>`)
93//! - **interrupted**: Atomic flag for cancellation (`Arc<AtomicBool>`)
94//! - **auto_exec_buffer**: Buffered blocks for auto-execution mode (`Vec<ContentBlock>`)
95//! - **auto_exec_index**: Current position in buffer (usize)
96//!
97//! ## Interruption Mechanism
98//!
99//! The interrupt system uses `Arc<AtomicBool>` to enable safe, thread-safe cancellation:
100//!
101//! ```rust,no_run
102//! # use open_agent::{Client, AgentOptions};
103//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
104//! let mut client = Client::new(AgentOptions::default())?;
105//! let handle = client.interrupt_handle(); // Clone Arc for use in other threads
106//!
107//! // In another thread or async task:
108//! tokio::spawn(async move {
109//! tokio::time::sleep(std::time::Duration::from_secs(5)).await;
110//! handle.store(true, std::sync::atomic::Ordering::SeqCst);
111//! });
112//!
113//! client.send("Long request").await?;
114//! while let Some(block) = client.receive().await? {
115//! // Will stop when interrupted
116//! }
117//! # Ok(())
118//! # }
119//! ```
120//!
121//! ## Hook Integration
122//!
123//! Hooks provide extension points throughout the request lifecycle:
124//!
125//! - **UserPromptSubmit**: Called before sending user prompt (can modify or block)
126//! - **PreToolUse**: Called before executing each tool (can modify input or block execution)
127//! - **PostToolUse**: Called after tool execution (can modify result)
128//!
129//! Hooks are only invoked in specific scenarios and have access to conversation history.
130//!
131//! ## Error Handling
132//!
133//! Errors are propagated immediately and leave the client in a valid state:
134//!
135//! - **HTTP errors**: Network failures, timeouts, connection issues
136//! - **API errors**: Invalid model, authentication failures, rate limits
137//! - **Parse errors**: Malformed SSE responses, invalid JSON
138//! - **Tool errors**: Tool execution failures (converted to JSON error responses)
139//! - **Hook errors**: Hook execution failures or blocked operations
140//!
141//! After an error, the client remains usable for new requests.
142//!
143//! # Examples
144//!
145//! ## Simple Single-Turn Query
146//!
147//! ```rust,no_run
148//! use open_agent::{query, AgentOptions};
149//! use futures::StreamExt;
150//!
151//! #[tokio::main]
152//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
153//! let options = AgentOptions::builder()
154//! .model("gpt-4")
155//! .api_key("sk-...")
156//! .build()?;
157//!
158//! let mut stream = query("What is Rust?", &options).await?;
159//!
160//! while let Some(block) = stream.next().await {
161//! if let open_agent::ContentBlock::Text(text) = block? {
162//! print!("{}", text.text);
163//! }
164//! }
165//!
166//! Ok(())
167//! }
168//! ```
169//!
170//! ## Multi-Turn Conversation
171//!
172//! ```rust,no_run
173//! use open_agent::{Client, AgentOptions, ContentBlock};
174//! use futures::StreamExt;
175//!
176//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
177//! let mut client = Client::new(AgentOptions::builder()
178//! .model("gpt-4")
179//! .api_key("sk-...")
180//! .build()?)?;
181//!
182//! // First question
183//! client.send("What's the capital of France?").await?;
184//! while let Some(block) = client.receive().await? {
185//! if let ContentBlock::Text(text) = block {
186//! println!("{}", text.text);
187//! }
188//! }
189//!
190//! // Follow-up question (history is maintained)
191//! client.send("What's its population?").await?;
192//! while let Some(block) = client.receive().await? {
193//! if let ContentBlock::Text(text) = block {
194//! println!("{}", text.text);
195//! }
196//! }
197//! # Ok(())
198//! # }
199//! ```
200//!
201//! ## Manual Tool Execution
202//!
203//! ```rust,no_run
204//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
205//! use serde_json::json;
206//!
207//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208//! let calculator = Tool::new(
209//! "calculator",
210//! "Performs arithmetic operations",
211//! json!({"type": "object", "properties": {"operation": {"type": "string"}}}),
212//! |input| Box::pin(async move {
213//! // Custom execution logic
214//! Ok(json!({"result": 42}))
215//! })
216//! );
217//!
218//! let mut client = Client::new(AgentOptions::builder()
219//! .model("gpt-4")
220//! .api_key("sk-...")
221//! .tools(vec![calculator])
222//! .build()?)?;
223//!
224//! client.send("Calculate 2+2").await?;
225//!
226//! while let Some(block) = client.receive().await? {
227//! match block {
228//! ContentBlock::ToolUse(tool_use) => {
229//! println!("Model wants to use: {}", tool_use.name());
230//!
231//! // Execute tool manually
232//! let tool = client.get_tool(tool_use.name()).unwrap();
233//! let result = tool.execute(tool_use.input().clone()).await?;
234//!
235//! // Add result and continue
236//! client.add_tool_result(tool_use.id(), result)?;
237//! client.send("").await?;
238//! }
239//! ContentBlock::Text(text) => {
240//! println!("Response: {}", text.text);
241//! }
242//! ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
243//! }
244//! }
245//! # Ok(())
246//! # }
247//! ```
248//!
249//! ## Automatic Tool Execution
250//!
251//! ```rust,no_run
252//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
253//! use serde_json::json;
254//!
255//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256//! let calculator = Tool::new(
257//! "calculator",
258//! "Performs arithmetic operations",
259//! json!({"type": "object"}),
260//! |input| Box::pin(async move { Ok(json!({"result": 42})) })
261//! );
262//!
263//! let mut client = Client::new(AgentOptions::builder()
264//! .model("gpt-4")
265//! .api_key("sk-...")
266//! .tools(vec![calculator])
267//! .auto_execute_tools(true) // Enable auto-execution
268//! .max_tool_iterations(5) // Max 5 tool rounds
269//! .build()?)?;
270//!
271//! client.send("Calculate 2+2 and then multiply by 3").await?;
272//!
273//! // Tools are executed automatically - you only get final text response
274//! while let Some(block) = client.receive().await? {
275//! if let ContentBlock::Text(text) = block {
276//! println!("{}", text.text);
277//! }
278//! }
279//! # Ok(())
280//! # }
281//! ```
282//!
283//! ## With Hooks
284//!
285//! ```ignore
286//! use open_agent::{Client, AgentOptions, Hooks, HookDecision};
287//!
288//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
289//! let hooks = Hooks::new()
290//! .add_user_prompt_submit(|event| async move {
291//! // Block prompts containing certain words
292//! if event.prompt.contains("forbidden") {
293//! return Some(HookDecision::block("Forbidden word detected"));
294//! }
295//! Some(HookDecision::continue_())
296//! })
297//! .add_pre_tool_use(|event| async move {
298//! // Log all tool uses
299//! println!("Executing tool: {}", event.tool_name);
300//! Some(HookDecision::continue_())
301//! });
302//!
303//! let mut client = Client::new(AgentOptions::builder()
304//! .model("gpt-4")
305//! .base_url("http://localhost:1234/v1")
306//! .hooks(hooks)
307//! .build()?)?;
308//!
309//! // Hooks will be executed automatically
310//! client.send("Hello!").await?;
311//! # Ok(())
312//! # }
313//! ```
314
315use crate::types::{
316 AgentOptions, ContentBlock, Message, MessageRole, OpenAIContent, OpenAIContentPart,
317 OpenAIFunction, OpenAIMessage, OpenAIRequest, OpenAIToolCall, TextBlock,
318};
319use crate::utils::{ToolCallAggregator, parse_sse_stream};
320use crate::{Error, Result};
321use futures::stream::{Stream, StreamExt};
322use std::pin::Pin;
323use std::sync::Arc;
324use std::sync::atomic::{AtomicBool, Ordering};
325use std::time::Duration;
326
327/// A pinned, boxed stream of content blocks from the model.
328///
329/// This type alias represents an asynchronous stream that yields `ContentBlock` items.
330/// Each item is wrapped in a `Result` to handle potential errors during streaming.
331///
332/// The stream is:
333/// - **Pinned** (`Pin<Box<...>>`): Required for safe async operations and self-referential types
334/// - **Boxed**: Allows dynamic dispatch and hides the concrete stream implementation
335/// - **Send**: Can be safely transferred between threads
336///
337/// # Content Blocks
338///
339/// The stream can yield several types of content blocks:
340///
341/// - **TextBlock**: Incremental text responses from the model
342/// - **ToolUseBlock**: Requests to execute a tool with specific parameters
343/// - **ToolResultBlock**: Results from tool execution (in manual mode)
344///
345/// # Error Handling
346///
347/// Errors in the stream indicate issues like:
348/// - Network failures or timeouts
349/// - Malformed SSE events
350/// - JSON parsing errors
351/// - API errors from the model provider
352///
353/// When an error occurs, the stream typically terminates. It's the caller's responsibility
354/// to handle errors appropriately.
355///
356/// # Examples
357///
358/// ```rust,no_run
359/// use open_agent::{query, AgentOptions, ContentBlock};
360/// use futures::StreamExt;
361///
362/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
363/// let options = AgentOptions::builder()
364/// .model("gpt-4")
365/// .api_key("sk-...")
366/// .build()?;
367///
368/// let mut stream = query("Hello!", &options).await?;
369///
370/// while let Some(result) = stream.next().await {
371/// match result {
372/// Ok(ContentBlock::Text(text)) => print!("{}", text.text),
373/// Ok(_) => {}, // Other block types
374/// Err(e) => eprintln!("Stream error: {}", e),
375/// }
376/// }
377/// # Ok(())
378/// # }
379/// ```
380pub type ContentStream = Pin<Box<dyn Stream<Item = Result<ContentBlock>> + Send>>;
381
382/// Simple query function for single-turn interactions without conversation history.
383///
384/// This is a stateless convenience function for simple queries that don't require
385/// multi-turn conversations. It creates a temporary HTTP client, sends a single
386/// prompt, and returns a stream of content blocks.
387///
388/// For multi-turn conversations or more control over the interaction, use [`Client`] instead.
389///
390/// # Parameters
391///
392/// - `prompt`: The user's message to send to the model
393/// - `options`: Configuration including model, API key, tools, etc.
394///
395/// # Returns
396///
397/// Returns a `ContentStream` that yields content blocks as they arrive from the model.
398/// The stream must be polled to completion to receive all blocks.
399///
400/// # Behavior
401///
402/// 1. Creates a temporary HTTP client with configured timeout
403/// 2. Builds message array (system prompt + user prompt)
404/// 3. Converts tools to OpenAI format if provided
405/// 4. Makes HTTP POST request to `/chat/completions`
406/// 5. Parses Server-Sent Events (SSE) response stream
407/// 6. Aggregates chunks into complete content blocks
408/// 7. Returns stream that yields blocks as they complete
409///
410/// # Error Handling
411///
412/// This function can return errors for:
413/// - HTTP client creation failures
414/// - Network errors during the request
415/// - API errors (authentication, invalid model, rate limits, etc.)
416/// - SSE parsing errors
417/// - JSON deserialization errors
418///
419/// # Performance Notes
420///
421/// - Creates a new HTTP client for each call (consider using `Client` for repeated queries)
422/// - Timeout is configurable via `AgentOptions::timeout` (default: 120 seconds)
423/// - Streaming begins immediately; no buffering of the full response
424///
425/// # Examples
426///
427/// ## Basic Usage
428///
429/// ```rust,no_run
430/// use open_agent::{query, AgentOptions};
431/// use futures::StreamExt;
432///
433/// #[tokio::main]
434/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
435/// let options = AgentOptions::builder()
436/// .system_prompt("You are a helpful assistant")
437/// .model("gpt-4")
438/// .api_key("sk-...")
439/// .build()?;
440///
441/// let mut stream = query("What's the capital of France?", &options).await?;
442///
443/// while let Some(block) = stream.next().await {
444/// match block? {
445/// open_agent::ContentBlock::Text(text) => {
446/// print!("{}", text.text);
447/// }
448/// open_agent::ContentBlock::ToolUse(_)
449/// | open_agent::ContentBlock::ToolResult(_)
450/// | open_agent::ContentBlock::Image(_) => {}
451/// }
452/// }
453///
454/// Ok(())
455/// }
456/// ```
457///
458/// ## With Tools
459///
460/// ```rust,no_run
461/// use open_agent::{query, AgentOptions, Tool, ContentBlock};
462/// use futures::StreamExt;
463/// use serde_json::json;
464///
465/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
466/// let calculator = Tool::new(
467/// "calculator",
468/// "Performs calculations",
469/// json!({"type": "object"}),
470/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
471/// );
472///
473/// let options = AgentOptions::builder()
474/// .model("gpt-4")
475/// .api_key("sk-...")
476/// .tools(vec![calculator])
477/// .build()?;
478///
479/// let mut stream = query("Calculate 2+2", &options).await?;
480///
481/// while let Some(block) = stream.next().await {
482/// match block? {
483/// ContentBlock::ToolUse(tool_use) => {
484/// println!("Model wants to use: {}", tool_use.name());
485/// // Note: You'll need to manually execute tools and continue
486/// // the conversation. For automatic execution, use Client.
487/// }
488/// ContentBlock::Text(text) => print!("{}", text.text),
489/// ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
490/// }
491/// }
492/// # Ok(())
493/// # }
494/// ```
495///
496/// ## Error Handling
497///
498/// ```rust,no_run
499/// use open_agent::{query, AgentOptions};
500/// use futures::StreamExt;
501///
502/// # async fn example() {
503/// let options = AgentOptions::builder()
504/// .model("gpt-4")
505/// .api_key("invalid-key")
506/// .build()
507/// .unwrap();
508///
509/// match query("Hello", &options).await {
510/// Ok(mut stream) => {
511/// while let Some(result) = stream.next().await {
512/// match result {
513/// Ok(block) => println!("Block: {:?}", block),
514/// Err(e) => {
515/// eprintln!("Stream error: {}", e);
516/// break;
517/// }
518/// }
519/// }
520/// }
521/// Err(e) => eprintln!("Query failed: {}", e),
522/// }
523/// # }
524/// ```
525pub async fn query(prompt: &str, options: &AgentOptions) -> Result<ContentStream> {
526 // Create HTTP client with configured timeout
527 // The timeout applies to the entire request, not individual chunks
528 let client = reqwest::Client::builder()
529 .timeout(Duration::from_secs(options.timeout()))
530 .build()
531 .map_err(Error::Http)?;
532
533 // Build messages array for the API request
534 // OpenAI format expects an array of message objects with role and content
535 let mut messages = Vec::new();
536
537 // Add system prompt if provided
538 // System prompts set the assistant's behavior and context
539 if !options.system_prompt().is_empty() {
540 messages.push(OpenAIMessage {
541 role: "system".to_string(),
542 content: Some(OpenAIContent::Text(options.system_prompt().to_string())),
543 tool_calls: None,
544 tool_call_id: None,
545 });
546 }
547
548 // Add user prompt
549 // This is the actual query from the user
550 messages.push(OpenAIMessage {
551 role: "user".to_string(),
552 content: Some(OpenAIContent::Text(prompt.to_string())),
553 tool_calls: None,
554 tool_call_id: None,
555 });
556
557 // Convert tools to OpenAI format if any are provided
558 // Tools are described using JSON Schema for parameter validation
559 let tools = if !options.tools().is_empty() {
560 Some(
561 options
562 .tools()
563 .iter()
564 .map(|t| t.to_openai_format())
565 .collect(),
566 )
567 } else {
568 None
569 };
570
571 // Build the OpenAI-compatible request payload
572 // stream=true enables Server-Sent Events for incremental responses
573 let request = OpenAIRequest {
574 model: options.model().to_string(),
575 messages,
576 stream: true, // Critical: enables SSE streaming
577 max_tokens: options.max_tokens(),
578 temperature: Some(options.temperature()),
579 tools,
580 };
581
582 // Make HTTP POST request to the chat completions endpoint
583 let url = format!("{}/chat/completions", options.base_url());
584 let response = client
585 .post(&url)
586 .header("Authorization", format!("Bearer {}", options.api_key()))
587 .header("Content-Type", "application/json")
588 .json(&request)
589 .send()
590 .await
591 .map_err(Error::Http)?;
592
593 // Check for HTTP-level errors before processing the stream
594 // This catches authentication failures, rate limits, invalid models, etc.
595 if !response.status().is_success() {
596 let status = response.status();
597 let body = response.text().await.unwrap_or_else(|e| {
598 eprintln!("WARNING: Failed to read error response body: {}", e);
599 "Unknown error (failed to read response body)".to_string()
600 });
601 return Err(Error::api(format!("API error {}: {}", status, body)));
602 }
603
604 // Parse the Server-Sent Events (SSE) stream
605 // The response body is a stream of "data: {...}" events
606 let sse_stream = parse_sse_stream(response);
607
608 // Aggregate SSE chunks into complete content blocks
609 // ToolCallAggregator handles partial JSON and assembles complete tool calls
610 // The scan() combinator maintains state across stream items
611 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
612 let result = match chunk_result {
613 Ok(chunk) => match aggregator.process_chunk(chunk) {
614 Ok(blocks) => {
615 if blocks.is_empty() {
616 Some(None) // Intermediate chunk, continue streaming
617 } else {
618 Some(Some(Ok(blocks))) // Complete block(s) ready
619 }
620 }
621 Err(e) => Some(Some(Err(e))), // Propagate processing error
622 },
623 Err(e) => Some(Some(Err(e))), // Propagate stream error
624 };
625 futures::future::ready(result)
626 });
627
628 // Flatten the stream to emit individual blocks
629 // filter_map removes None values (incomplete chunks)
630 // flat_map expands Vec<ContentBlock> into individual items
631 let flattened = stream
632 .filter_map(|item| async move { item })
633 .flat_map(|result| {
634 futures::stream::iter(match result {
635 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
636 Err(e) => vec![Err(e)],
637 })
638 });
639
640 // Pin and box the stream for type erasure and safe async usage
641 Ok(Box::pin(flattened))
642}
643
644/// Stateful client for multi-turn conversations with automatic history management.
645///
646/// The `Client` is the primary interface for building conversational AI applications.
647/// It maintains conversation history, manages streaming responses, and provides two
648/// modes of operation: manual and automatic tool execution.
649///
650/// # State Management
651///
652/// The client maintains several pieces of state that persist across multiple turns:
653///
654/// - **Conversation History**: Complete record of all messages exchanged
655/// - **Active Stream**: Currently active SSE stream being consumed
656/// - **Interrupt Flag**: Thread-safe cancellation signal
657/// - **Auto-Execution Buffer**: Cached blocks for auto-execution mode
658///
659/// # Operating Modes
660///
661/// ## Manual Mode (default)
662///
663/// In manual mode, the client streams blocks directly to the caller. When the model
664/// requests a tool, you receive a `ToolUseBlock`, execute the tool yourself, add the
665/// result with `add_tool_result()`, and continue the conversation.
666///
667/// **Advantages**:
668/// - Full control over tool execution
669/// - Custom error handling per tool
670/// - Ability to modify tool inputs/outputs
671/// - Interactive debugging capabilities
672///
673/// ## Automatic Mode (`auto_execute_tools = true`)
674///
675/// In automatic mode, the client executes tools transparently and only returns the
676/// final text response after all tool iterations complete.
677///
678/// **Advantages**:
679/// - Simpler API for common use cases
680/// - Built-in retry logic via hooks
681/// - Automatic conversation continuation
682/// - Configurable iteration limits
683///
684/// # Thread Safety
685///
686/// The client is NOT thread-safe for concurrent use. However, the interrupt mechanism
687/// uses `Arc<AtomicBool>` which can be safely shared across threads to signal cancellation.
688///
689/// # Memory Management
690///
691/// - History grows unbounded by default (consider clearing periodically)
692/// - Streams are consumed lazily (low memory footprint during streaming)
693/// - Auto-execution buffers entire response (higher memory in auto mode)
694///
695/// # Examples
696///
697/// ## Basic Multi-Turn Conversation
698///
699/// ```rust,no_run
700/// use open_agent::{Client, AgentOptions, ContentBlock};
701///
702/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
703/// let mut client = Client::new(AgentOptions::builder()
704/// .model("gpt-4")
705/// .api_key("sk-...")
706/// .build()?)?;
707///
708/// // First question
709/// client.send("What's the capital of France?").await?;
710/// while let Some(block) = client.receive().await? {
711/// if let ContentBlock::Text(text) = block {
712/// println!("{}", text.text); // "Paris is the capital of France."
713/// }
714/// }
715///
716/// // Follow-up question - history is automatically maintained
717/// client.send("What's its population?").await?;
718/// while let Some(block) = client.receive().await? {
719/// if let ContentBlock::Text(text) = block {
720/// println!("{}", text.text); // "Paris has approximately 2.2 million people."
721/// }
722/// }
723/// # Ok(())
724/// # }
725/// ```
726///
727/// ## Manual Tool Execution
728///
729/// ```rust,no_run
730/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
731/// use serde_json::json;
732///
733/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
734/// let calculator = Tool::new(
735/// "calculator",
736/// "Performs arithmetic",
737/// json!({"type": "object"}),
738/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
739/// );
740///
741/// let mut client = Client::new(AgentOptions::builder()
742/// .model("gpt-4")
743/// .api_key("sk-...")
744/// .tools(vec![calculator])
745/// .build()?)?;
746///
747/// client.send("What's 2+2?").await?;
748///
749/// while let Some(block) = client.receive().await? {
750/// match block {
751/// ContentBlock::ToolUse(tool_use) => {
752/// // Execute tool manually
753/// let result = json!({"result": 4});
754/// client.add_tool_result(tool_use.id(), result)?;
755///
756/// // Continue conversation to get model's response
757/// client.send("").await?;
758/// }
759/// ContentBlock::Text(text) => {
760/// println!("{}", text.text); // "The result is 4."
761/// }
762/// ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
763/// }
764/// }
765/// # Ok(())
766/// # }
767/// ```
768///
769/// ## Automatic Tool Execution
770///
771/// ```rust,no_run
772/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
773/// use serde_json::json;
774///
775/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
776/// let calculator = Tool::new(
777/// "calculator",
778/// "Performs arithmetic",
779/// json!({"type": "object"}),
780/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
781/// );
782///
783/// let mut client = Client::new(AgentOptions::builder()
784/// .model("gpt-4")
785/// .api_key("sk-...")
786/// .tools(vec![calculator])
787/// .auto_execute_tools(true) // Enable auto-execution
788/// .build()?)?;
789///
790/// client.send("What's 2+2?").await?;
791///
792/// // Tools execute automatically - you only receive final text
793/// while let Some(block) = client.receive().await? {
794/// if let ContentBlock::Text(text) = block {
795/// println!("{}", text.text); // "The result is 4."
796/// }
797/// }
798/// # Ok(())
799/// # }
800/// ```
801///
802/// ## With Interruption
803///
804/// ```rust,no_run
805/// use open_agent::{Client, AgentOptions};
806/// use std::time::Duration;
807///
808/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
809/// let mut client = Client::new(AgentOptions::default())?;
810///
811/// // Start a long-running query
812/// client.send("Write a very long story").await?;
813///
814/// // Spawn a task to interrupt after timeout
815/// let interrupt_handle = client.interrupt_handle();
816/// tokio::spawn(async move {
817/// tokio::time::sleep(Duration::from_secs(5)).await;
818/// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
819/// });
820///
821/// // This loop will stop when interrupted
822/// while let Some(block) = client.receive().await? {
823/// // Process blocks...
824/// }
825///
826/// // Client is still usable after interruption
827/// client.send("What's 2+2?").await?;
828/// # Ok(())
829/// # }
830/// ```
831pub struct Client {
832 /// Configuration options including model, API key, tools, hooks, etc.
833 ///
834 /// This field contains all the settings that control how the client behaves.
835 /// It's set once during construction and cannot be modified (though you can
836 /// access it via `options()` for inspection).
837 options: AgentOptions,
838
839 /// Complete conversation history as a sequence of messages.
840 ///
841 /// Each message contains a role (System/User/Assistant/Tool) and content blocks.
842 /// History grows unbounded by default - use `clear_history()` to reset.
843 ///
844 /// **Important**: The history includes ALL messages, not just user/assistant.
845 /// This includes tool results and intermediate assistant messages from tool calls.
846 history: Vec<Message>,
847
848 /// Currently active SSE stream being consumed.
849 ///
850 /// This is `Some(stream)` while a response is being received, and `None` when
851 /// no request is in flight or after a response completes.
852 ///
853 /// The stream is set by `send()` and consumed by `receive()`. When the stream
854 /// is exhausted, `receive()` returns `Ok(None)` and sets this back to `None`.
855 current_stream: Option<ContentStream>,
856
857 /// Reusable HTTP client for making API requests.
858 ///
859 /// Configured once during construction with the timeout from `AgentOptions`.
860 /// Reusing the same client across requests enables connection pooling and
861 /// better performance for multi-turn conversations.
862 http_client: reqwest::Client,
863
864 /// Thread-safe interrupt flag for cancellation.
865 ///
866 /// This `Arc<AtomicBool>` can be cloned and shared across threads or async tasks
867 /// to signal cancellation. When set to `true`, the next `receive()` call will
868 /// return `Ok(None)` and clear the current stream.
869 ///
870 /// The flag is automatically reset to `false` at the start of each `send()` call.
871 ///
872 /// **Thread Safety**: Can be safely accessed from multiple threads using atomic
873 /// operations. However, only one thread should call `send()`/`receive()`.
874 interrupted: Arc<AtomicBool>,
875
876 /// Buffer of content blocks for auto-execution mode.
877 ///
878 /// When `auto_execute_tools` is enabled, `receive()` internally calls the
879 /// auto-execution loop which buffers all final text blocks here. Subsequent
880 /// calls to `receive()` return blocks from this buffer one at a time.
881 ///
882 /// **Only used when `options.auto_execute_tools == true`**.
883 ///
884 /// The buffer is cleared when starting a new auto-execution loop.
885 auto_exec_buffer: Vec<ContentBlock>,
886
887 /// Current read position in the auto-execution buffer.
888 ///
889 /// Tracks which block to return next when `receive()` is called in auto mode.
890 /// Reset to 0 when the buffer is refilled with a new response.
891 ///
892 /// **Only used when `options.auto_execute_tools == true`**.
893 auto_exec_index: usize,
894
895 /// Accumulator for assistant response blocks in manual mode.
896 ///
897 /// In manual mode, `receive()` streams blocks one at a time to the caller.
898 /// This buffer collects those blocks so that when the stream ends, the
899 /// complete assistant message can be added to conversation history.
900 ///
901 /// **Only used when `options.auto_execute_tools == false`**.
902 manual_receive_buffer: Vec<ContentBlock>,
903}
904
905impl Client {
906 /// Creates a new client with the specified configuration.
907 ///
908 /// This constructor initializes all state fields and creates a reusable HTTP client
909 /// configured with the timeout from `AgentOptions`.
910 ///
911 /// # Parameters
912 ///
913 /// - `options`: Configuration including model, API key, tools, hooks, etc.
914 ///
915 /// # Errors
916 ///
917 /// Returns an error if the HTTP client cannot be built. This can happen due to:
918 /// - Invalid TLS configuration
919 /// - System resource exhaustion
920 /// - Invalid timeout values
921 ///
922 /// # Examples
923 ///
924 /// ```rust
925 /// use open_agent::{Client, AgentOptions};
926 ///
927 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
928 /// let client = Client::new(AgentOptions::builder()
929 /// .model("gpt-4")
930 /// .base_url("http://localhost:1234/v1")
931 /// .build()?)?;
932 /// # Ok(())
933 /// # }
934 /// ```
935 pub fn new(options: AgentOptions) -> Result<Self> {
936 // Build HTTP client with configured timeout
937 // This client is reused across all requests for connection pooling
938 let http_client = reqwest::Client::builder()
939 .timeout(Duration::from_secs(options.timeout()))
940 .build()
941 .map_err(|e| Error::config(format!("Failed to build HTTP client: {}", e)))?;
942
943 Ok(Self {
944 options,
945 history: Vec::new(), // Empty conversation history
946 current_stream: None, // No active stream yet
947 http_client,
948 interrupted: Arc::new(AtomicBool::new(false)), // Not interrupted initially
949 auto_exec_buffer: Vec::new(), // Empty buffer for auto mode
950 auto_exec_index: 0, // Start at beginning of buffer
951 manual_receive_buffer: Vec::new(), // Empty buffer for manual mode
952 })
953 }
954
955 /// Sends a user message and initiates streaming of the model's response.
956 ///
957 /// This method performs several critical steps:
958 ///
959 /// 1. Executes UserPromptSubmit hooks (which can modify or block the prompt)
960 /// 2. Adds the user message to conversation history
961 /// 3. Builds and sends HTTP request to the OpenAI-compatible API
962 /// 4. Parses the SSE stream and sets up aggregation
963 /// 5. Stores the stream for consumption via `receive()`
964 ///
965 /// # Parameters
966 ///
967 /// - `prompt`: The user's message. Can be empty to continue conversation after
968 /// adding tool results (common pattern in manual tool execution mode).
969 ///
970 /// # Returns
971 ///
972 /// - `Ok(())`: Request sent successfully, call `receive()` to get blocks
973 /// - `Err(e)`: Request failed (network error, API error, hook blocked, etc.)
974 ///
975 /// # Behavior Details
976 ///
977 /// ## Hook Execution
978 ///
979 /// Before sending, UserPromptSubmit hooks are executed. Hooks can:
980 /// - Modify the prompt text
981 /// - Block the request entirely
982 /// - Access conversation history
983 ///
984 /// If a hook blocks the request, this method returns an error immediately.
985 ///
986 /// ## History Management
987 ///
988 /// The prompt is added to history BEFORE sending the request. This ensures
989 /// that history is consistent even if the request fails.
990 ///
991 /// ## Stream Setup
992 ///
993 /// The response stream is set up but not consumed. You must call `receive()`
994 /// repeatedly to get content blocks. The stream remains active until:
995 /// - All blocks are consumed (stream naturally ends)
996 /// - An error occurs
997 /// - Interrupt is triggered
998 ///
999 /// ## Interrupt Handling
1000 ///
1001 /// The interrupt flag is reset to `false` at the start of this method,
1002 /// allowing a fresh request after a previous interruption.
1003 ///
1004 /// # State Changes
1005 ///
1006 /// - Resets `interrupted` flag to `false`
1007 /// - Appends user message to `history`
1008 /// - Sets `current_stream` to new SSE stream
1009 /// - Does NOT modify `auto_exec_buffer` or `auto_exec_index`
1010 ///
1011 /// # Errors
1012 ///
1013 /// Returns errors for:
1014 /// - Hook blocking the prompt
1015 /// - HTTP client errors (network failure, DNS, etc.)
1016 /// - API errors (auth failure, invalid model, rate limits)
1017 /// - Invalid response format
1018 ///
1019 /// After an error, the client remains usable for new requests.
1020 ///
1021 /// # Examples
1022 ///
1023 /// ## Basic Usage
1024 ///
1025 /// ```rust,no_run
1026 /// # use open_agent::{Client, AgentOptions};
1027 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1028 /// # let mut client = Client::new(AgentOptions::default())?;
1029 /// client.send("Hello!").await?;
1030 ///
1031 /// while let Some(block) = client.receive().await? {
1032 /// // Process blocks...
1033 /// }
1034 /// # Ok(())
1035 /// # }
1036 /// ```
1037 ///
1038 /// ## Continuing After Tool Result
1039 ///
1040 /// ```rust,no_run
1041 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1042 /// # use serde_json::json;
1043 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1044 /// # let mut client = Client::new(AgentOptions::default())?;
1045 /// client.send("Use the calculator").await?;
1046 ///
1047 /// while let Some(block) = client.receive().await? {
1048 /// if let ContentBlock::ToolUse(tool_use) = block {
1049 /// // Execute tool and add result
1050 /// client.add_tool_result(tool_use.id(), json!({"result": 42}))?;
1051 ///
1052 /// // Continue conversation with empty prompt
1053 /// client.send("").await?;
1054 /// }
1055 /// }
1056 /// # Ok(())
1057 /// # }
1058 /// ```
1059 pub async fn send(&mut self, prompt: &str) -> Result<()> {
1060 use crate::hooks::UserPromptSubmitEvent;
1061
1062 // Reset interrupt flag for new query
1063 // This allows the client to be reused after a previous interruption
1064 // Uses SeqCst ordering to ensure visibility across all threads
1065 self.interrupted.store(false, Ordering::SeqCst);
1066
1067 // Discard any leftover manual-mode blocks from an abandoned stream.
1068 // If the prior stream completed normally, receive() already committed
1069 // the buffer to history on EOF. If the caller is calling send() before
1070 // the stream finished, the buffer is partial and must not be persisted.
1071 self.manual_receive_buffer.clear();
1072 self.current_stream = None;
1073
1074 // Execute UserPromptSubmit hooks
1075 // Hooks run BEFORE adding to history, allowing modification or blocking
1076 let mut final_prompt = prompt.to_string();
1077 let history_snapshot: Vec<serde_json::Value> = self
1078 .history
1079 .iter()
1080 .map(|_| serde_json::json!({})) // Simplified snapshot for hooks
1081 .collect();
1082
1083 // Create hook event with current prompt and history
1084 let event = UserPromptSubmitEvent::new(final_prompt.clone(), history_snapshot);
1085
1086 // Execute all registered UserPromptSubmit hooks
1087 if let Some(decision) = self.options.hooks().execute_user_prompt_submit(event).await {
1088 // Check if hook wants to block execution
1089 if !decision.continue_execution() {
1090 return Err(Error::other(format!(
1091 "Prompt blocked by hook: {}",
1092 decision.reason().unwrap_or("")
1093 )));
1094 }
1095 // Apply any prompt modifications from hooks
1096 if let Some(modified) = decision.modified_prompt() {
1097 final_prompt = modified.to_string();
1098 }
1099 }
1100
1101 // Add user message to history BEFORE sending request
1102 // This ensures history consistency even if request fails
1103 // Empty prompts are still added (needed for tool continuation)
1104 self.history.push(Message::user(final_prompt));
1105
1106 // Build messages array for API request
1107 // This includes system prompt + full conversation history
1108 let mut messages = Vec::new();
1109
1110 // Add system prompt as first message if configured
1111 // System prompts are added fresh for each request (not from history)
1112 if !self.options.system_prompt().is_empty() {
1113 messages.push(OpenAIMessage {
1114 role: "system".to_string(),
1115 content: Some(OpenAIContent::Text(
1116 self.options.system_prompt().to_string(),
1117 )),
1118 tool_calls: None,
1119 tool_call_id: None,
1120 });
1121 }
1122
1123 // Convert conversation history to OpenAI message format
1124 // This includes user prompts, assistant responses, and tool results
1125 for msg in &self.history {
1126 // Separate blocks by type to determine message structure
1127 let mut text_blocks = Vec::new();
1128 let mut image_blocks = Vec::new();
1129 let mut tool_use_blocks = Vec::new();
1130 let mut tool_result_blocks = Vec::new();
1131
1132 for block in &msg.content {
1133 match block {
1134 ContentBlock::Text(text) => text_blocks.push(text),
1135 ContentBlock::Image(image) => image_blocks.push(image),
1136 ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
1137 ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
1138 }
1139 }
1140
1141 // Handle different message types based on content blocks
1142 // Case 1: Message contains tool results (should be separate tool messages)
1143 if !tool_result_blocks.is_empty() {
1144 for tool_result in tool_result_blocks {
1145 // Serialize the tool result content as JSON string
1146 let content =
1147 serde_json::to_string(tool_result.content()).unwrap_or_else(|e| {
1148 format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
1149 });
1150
1151 messages.push(OpenAIMessage {
1152 role: "tool".to_string(),
1153 content: Some(OpenAIContent::Text(content)),
1154 tool_calls: None,
1155 tool_call_id: Some(tool_result.tool_use_id().to_string()),
1156 });
1157 }
1158 }
1159 // Case 2: Message contains tool use blocks (assistant with tool calls)
1160 else if !tool_use_blocks.is_empty() {
1161 // Build tool_calls array
1162 let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
1163 .iter()
1164 .map(|tool_use| {
1165 // Serialize the input as a JSON string (OpenAI API requirement)
1166 let arguments = serde_json::to_string(tool_use.input())
1167 .unwrap_or_else(|_| "{}".to_string());
1168
1169 OpenAIToolCall {
1170 id: tool_use.id().to_string(),
1171 call_type: "function".to_string(),
1172 function: OpenAIFunction {
1173 name: tool_use.name().to_string(),
1174 arguments,
1175 },
1176 }
1177 })
1178 .collect();
1179
1180 // Extract any text content (some models include reasoning before tool calls)
1181 // Note: OpenAI API requires content field even if empty when tool_calls present
1182 let content = if !text_blocks.is_empty() {
1183 let text = text_blocks
1184 .iter()
1185 .map(|t| t.text.as_str())
1186 .collect::<Vec<_>>()
1187 .join("\n");
1188 Some(OpenAIContent::Text(text))
1189 } else {
1190 // Empty string satisfies OpenAI API schema (content is required)
1191 Some(OpenAIContent::Text(String::new()))
1192 };
1193
1194 messages.push(OpenAIMessage {
1195 role: "assistant".to_string(),
1196 content,
1197 tool_calls: Some(tool_calls),
1198 tool_call_id: None,
1199 });
1200 }
1201 // Case 3: Message contains images (use OpenAIContent::Parts)
1202 else if !image_blocks.is_empty() {
1203 // Log debug info about images being serialized
1204 log::debug!(
1205 "Serializing message with {} image(s) for {:?} role",
1206 image_blocks.len(),
1207 msg.role
1208 );
1209
1210 // Build content parts array preserving original order
1211 let mut content_parts = Vec::new();
1212
1213 // Re-iterate through content blocks to maintain order
1214 for block in &msg.content {
1215 match block {
1216 ContentBlock::Text(text) => {
1217 content_parts.push(OpenAIContentPart::text(&text.text));
1218 }
1219 ContentBlock::Image(image) => {
1220 // Log image details (truncate URL for privacy)
1221 let url_display = if image.url().len() > 100 {
1222 format!("{}... ({} chars)", &image.url()[..100], image.url().len())
1223 } else {
1224 image.url().to_string()
1225 };
1226 let detail_str = match image.detail() {
1227 crate::types::ImageDetail::Low => "low",
1228 crate::types::ImageDetail::High => "high",
1229 crate::types::ImageDetail::Auto => "auto",
1230 };
1231 log::debug!(" - Image: {} (detail: {})", url_display, detail_str);
1232
1233 content_parts.push(OpenAIContentPart::from_image(image));
1234 }
1235 ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
1236 }
1237 }
1238
1239 // Defensive check: content_parts should never be empty at this point
1240 // If it is, it indicates a logic error (e.g., all blocks were filtered out)
1241 if content_parts.is_empty() {
1242 return Err(Error::other(
1243 "Internal error: Message with images produced empty content array",
1244 ));
1245 }
1246
1247 let role_str = match msg.role {
1248 MessageRole::System => "system",
1249 MessageRole::User => "user",
1250 MessageRole::Assistant => "assistant",
1251 MessageRole::Tool => "tool",
1252 };
1253
1254 messages.push(OpenAIMessage {
1255 role: role_str.to_string(),
1256 content: Some(OpenAIContent::Parts(content_parts)),
1257 tool_calls: None,
1258 tool_call_id: None,
1259 });
1260 }
1261 // Case 4: Message contains only text (normal message, backward compatible)
1262 else {
1263 let content = text_blocks
1264 .iter()
1265 .map(|t| t.text.as_str())
1266 .collect::<Vec<_>>()
1267 .join("\n");
1268
1269 let role_str = match msg.role {
1270 MessageRole::System => "system",
1271 MessageRole::User => "user",
1272 MessageRole::Assistant => "assistant",
1273 MessageRole::Tool => "tool",
1274 };
1275
1276 messages.push(OpenAIMessage {
1277 role: role_str.to_string(),
1278 content: Some(OpenAIContent::Text(content)),
1279 tool_calls: None,
1280 tool_call_id: None,
1281 });
1282 }
1283 }
1284
1285 // Convert tools to OpenAI format if any are registered
1286 // Each tool is described with name, description, and JSON Schema parameters
1287 let tools = if !self.options.tools().is_empty() {
1288 Some(
1289 self.options
1290 .tools()
1291 .iter()
1292 .map(|t| t.to_openai_format())
1293 .collect(),
1294 )
1295 } else {
1296 None
1297 };
1298
1299 // Build the OpenAI-compatible request payload
1300 let request = OpenAIRequest {
1301 model: self.options.model().to_string(),
1302 messages,
1303 stream: true, // Always stream for progressive rendering
1304 max_tokens: self.options.max_tokens(),
1305 temperature: Some(self.options.temperature()),
1306 tools,
1307 };
1308
1309 // Make HTTP POST request to chat completions endpoint
1310 let url = format!("{}/chat/completions", self.options.base_url());
1311 let response = self
1312 .http_client
1313 .post(&url)
1314 .header(
1315 "Authorization",
1316 format!("Bearer {}", self.options.api_key()),
1317 )
1318 .header("Content-Type", "application/json")
1319 .json(&request)
1320 .send()
1321 .await
1322 .map_err(Error::Http)?;
1323
1324 // Check for HTTP-level errors before processing stream
1325 // This catches authentication, rate limits, invalid models, etc.
1326 if !response.status().is_success() {
1327 let status = response.status();
1328 let body = response.text().await.unwrap_or_else(|e| {
1329 eprintln!("WARNING: Failed to read error response body: {}", e);
1330 "Unknown error (failed to read response body)".to_string()
1331 });
1332 return Err(Error::api(format!("API error {}: {}", status, body)));
1333 }
1334
1335 // Parse Server-Sent Events (SSE) stream from response
1336 let sse_stream = parse_sse_stream(response);
1337
1338 // Aggregate SSE chunks into complete content blocks
1339 // ToolCallAggregator maintains state to handle incremental JSON chunks
1340 // that may arrive split across multiple SSE events
1341 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
1342 let result = match chunk_result {
1343 Ok(chunk) => match aggregator.process_chunk(chunk) {
1344 Ok(blocks) => {
1345 if blocks.is_empty() {
1346 Some(None) // Partial chunk, keep aggregating
1347 } else {
1348 Some(Some(Ok(blocks))) // Complete block(s) ready
1349 }
1350 }
1351 Err(e) => Some(Some(Err(e))), // Processing error
1352 },
1353 Err(e) => Some(Some(Err(e))), // Stream error
1354 };
1355 futures::future::ready(result)
1356 });
1357
1358 // Flatten the stream to emit individual blocks
1359 // filter_map removes None values (partial chunks)
1360 // flat_map converts Vec<ContentBlock> to individual items
1361 let flattened = stream
1362 .filter_map(|item| async move { item })
1363 .flat_map(|result| {
1364 futures::stream::iter(match result {
1365 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
1366 Err(e) => vec![Err(e)],
1367 })
1368 });
1369
1370 // Store the stream for consumption via receive()
1371 // The stream is NOT consumed here - that happens in receive()
1372 self.current_stream = Some(Box::pin(flattened));
1373
1374 Ok(())
1375 }
1376
1377 /// Internal method that returns one block from the current stream.
1378 ///
1379 /// This is the core streaming logic extracted for reuse by both manual mode
1380 /// and auto-execution mode. It handles interrupt checking and stream consumption.
1381 ///
1382 /// # Returns
1383 ///
1384 /// - `Ok(Some(block))`: Successfully received a content block
1385 /// - `Ok(None)`: Stream ended naturally or was interrupted
1386 /// - `Err(e)`: An error occurred during streaming
1387 ///
1388 /// # State Changes
1389 ///
1390 /// - Sets `current_stream` to `None` if interrupted or stream ends
1391 /// - Does not modify history or other state
1392 ///
1393 /// # Implementation Notes
1394 ///
1395 /// This method checks the interrupt flag on every call, allowing responsive
1396 /// cancellation. The check uses SeqCst ordering for immediate visibility of
1397 /// interrupts from other threads.
1398 async fn receive_one(&mut self) -> Result<Option<ContentBlock>> {
1399 // Check interrupt flag before attempting to receive
1400 // Uses SeqCst to ensure we see the latest value from any thread
1401 if self.interrupted.load(Ordering::SeqCst) {
1402 // Return None but leave current_stream intact so callers can
1403 // distinguish "interrupted a live stream" (current_stream is Some)
1404 // from "interrupt after stream already ended" (current_stream is None).
1405 return Ok(None);
1406 }
1407
1408 // Poll the current stream if one exists
1409 if let Some(stream) = &mut self.current_stream {
1410 match stream.next().await {
1411 Some(Ok(block)) => Ok(Some(block)),
1412 Some(Err(e)) => Err(e),
1413 None => {
1414 // Natural EOF — mark stream as fully consumed
1415 self.current_stream = None;
1416 Ok(None)
1417 }
1418 }
1419 } else {
1420 // No active stream
1421 Ok(None)
1422 }
1423 }
1424
1425 /// Collects all blocks from the current stream into a vector.
1426 ///
1427 /// Internal helper for auto-execution mode. This method buffers the entire
1428 /// response in memory, which is necessary to determine if the response contains
1429 /// tool calls before returning anything to the caller.
1430 ///
1431 /// # Returns
1432 ///
1433 /// - `Ok(vec)`: Successfully collected all blocks
1434 /// - `Err(e)`: Error during collection or interrupted
1435 ///
1436 /// # Memory Usage
1437 ///
1438 /// This buffers the entire response, which can be large for long completions.
1439 /// Consider the memory implications when using auto-execution mode.
1440 ///
1441 /// # Interruption
1442 ///
1443 /// Checks interrupt flag during collection and returns error if interrupted.
1444 async fn collect_all_blocks(&mut self) -> Result<Vec<ContentBlock>> {
1445 let mut blocks = Vec::new();
1446
1447 // Consume entire stream into vector
1448 while let Some(block) = self.receive_one().await? {
1449 // Check interrupt during collection for responsiveness
1450 if self.interrupted.load(Ordering::SeqCst) {
1451 self.current_stream = None;
1452 return Err(Error::other(
1453 "Operation interrupted during block collection",
1454 ));
1455 }
1456
1457 blocks.push(block);
1458 }
1459
1460 Ok(blocks)
1461 }
1462
1463 /// Executes a tool by name with the given input.
1464 ///
1465 /// Internal helper for auto-execution mode. Looks up the tool in the registered
1466 /// tools list and executes it with the provided input.
1467 ///
1468 /// # Parameters
1469 ///
1470 /// - `tool_name`: Name of the tool to execute
1471 /// - `input`: JSON value containing tool parameters
1472 ///
1473 /// # Returns
1474 ///
1475 /// - `Ok(result)`: Tool executed successfully, returns result as JSON
1476 /// - `Err(e)`: Tool not found or execution failed
1477 ///
1478 /// # Error Handling
1479 ///
1480 /// If the tool is not found in the registry, returns a ToolError.
1481 /// If execution fails, the error from the tool is propagated.
1482 async fn execute_tool_internal(
1483 &self,
1484 tool_name: &str,
1485 input: serde_json::Value,
1486 ) -> Result<serde_json::Value> {
1487 // Find tool in registered tools by name
1488 let tool = self
1489 .options
1490 .tools()
1491 .iter()
1492 .find(|t| t.name() == tool_name)
1493 .ok_or_else(|| Error::tool(format!("Tool '{}' not found", tool_name)))?;
1494
1495 // Execute the tool's async function
1496 tool.execute(input).await
1497 }
1498
1499 /// Auto-execution loop that handles tool calls automatically.
1500 ///
1501 /// This is the core implementation of automatic tool execution mode. It:
1502 ///
1503 /// 1. Collects all blocks from the current stream
1504 /// 2. Separates text blocks from tool use blocks
1505 /// 3. If there are tool blocks:
1506 /// - Executes PreToolUse hooks (can modify/block)
1507 /// - Executes each tool via its registered function
1508 /// - Executes PostToolUse hooks (can modify result)
1509 /// - Adds results to history
1510 /// - Continues conversation with send("")
1511 /// 4. Repeats until text-only response or max iterations
1512 /// 5. Returns all final text blocks
1513 ///
1514 /// # Returns
1515 ///
1516 /// - `Ok(blocks)`: Final text blocks after all tool iterations
1517 /// - `Err(e)`: Error during execution, stream processing, or interruption
1518 ///
1519 /// # Iteration Limit
1520 ///
1521 /// The loop is bounded by `options.max_tool_iterations` to prevent infinite loops.
1522 /// When the limit is reached, the loop stops and returns whatever text blocks
1523 /// have been collected so far.
1524 ///
1525 /// # Hook Integration
1526 ///
1527 /// Hooks are executed for each tool call:
1528 /// - **PreToolUse**: Can modify input or block execution entirely
1529 /// - **PostToolUse**: Can modify the result before it's added to history
1530 ///
1531 /// If a hook blocks execution, a JSON error response is used as the tool result.
1532 ///
1533 /// # State Management
1534 ///
1535 /// The loop maintains history by adding:
1536 /// - Assistant messages with text + tool use blocks
1537 /// - User messages with tool result blocks
1538 ///
1539 /// This creates a proper conversation flow that the model can follow.
1540 ///
1541 /// # Error Recovery
1542 ///
1543 /// If a tool execution fails, the error is converted to a JSON error response
1544 /// and added as the tool result. This allows the conversation to continue
1545 /// and lets the model handle the error.
1546 async fn auto_execute_loop(&mut self) -> Result<Vec<ContentBlock>> {
1547 use crate::types::ToolResultBlock;
1548
1549 // Track iterations to prevent infinite loops
1550 let mut iteration = 0;
1551 let max_iterations = self.options.max_tool_iterations();
1552
1553 loop {
1554 // ========================================================================
1555 // STEP 1: Collect all blocks from current stream
1556 // ========================================================================
1557 // Buffer the entire response to determine if it contains tool calls
1558 let blocks = self.collect_all_blocks().await?;
1559
1560 // Empty response means stream ended or was interrupted
1561 if blocks.is_empty() {
1562 return Ok(Vec::new());
1563 }
1564
1565 // ========================================================================
1566 // STEP 2: Separate text blocks from tool use blocks
1567 // ========================================================================
1568 // The model can return a mix of text and tool calls in one response
1569 let mut text_blocks = Vec::new();
1570 let mut tool_blocks = Vec::new();
1571
1572 for block in blocks {
1573 match block {
1574 ContentBlock::Text(_) => text_blocks.push(block),
1575 ContentBlock::ToolUse(_) => tool_blocks.push(block),
1576 ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {} // Ignore ToolResult and Image variants
1577 }
1578 }
1579
1580 // ========================================================================
1581 // STEP 3: Check if we're done (no tool calls)
1582 // ========================================================================
1583 // If the response contains no tool calls, we've reached the final answer
1584 if tool_blocks.is_empty() {
1585 // Add assistant's final text response to history
1586 if !text_blocks.is_empty() {
1587 let assistant_msg = Message::assistant(text_blocks.clone());
1588 self.history.push(assistant_msg);
1589 }
1590 // Return text blocks to caller via buffered receive()
1591 return Ok(text_blocks);
1592 }
1593
1594 // ========================================================================
1595 // STEP 4: Check iteration limit BEFORE executing tools
1596 // ========================================================================
1597 // Increment counter and check if we've hit the max
1598 iteration += 1;
1599 if iteration > max_iterations {
1600 // Max iterations reached - stop execution and return what we have
1601 // This prevents infinite tool-calling loops
1602 if !text_blocks.is_empty() {
1603 let assistant_msg = Message::assistant(text_blocks.clone());
1604 self.history.push(assistant_msg);
1605 }
1606 return Ok(text_blocks);
1607 }
1608
1609 // ========================================================================
1610 // STEP 5: Add assistant message to history
1611 // ========================================================================
1612 // The assistant message includes BOTH text and tool use blocks
1613 // This preserves the full context for future turns
1614 let mut all_blocks = text_blocks.clone();
1615 all_blocks.extend(tool_blocks.clone());
1616 let assistant_msg = Message::assistant(all_blocks);
1617 self.history.push(assistant_msg);
1618
1619 // ========================================================================
1620 // STEP 6: Execute all tools and collect results
1621 // ========================================================================
1622 for block in tool_blocks {
1623 if let ContentBlock::ToolUse(tool_use) = block {
1624 // Create simplified history snapshot for hooks
1625 // TODO: Full serialization of history for hooks
1626 let history_snapshot: Vec<serde_json::Value> =
1627 self.history.iter().map(|_| serde_json::json!({})).collect();
1628
1629 // ============================================================
1630 // Execute PreToolUse hooks
1631 // ============================================================
1632 use crate::hooks::PreToolUseEvent;
1633 let pre_event = PreToolUseEvent::new(
1634 tool_use.name().to_string(),
1635 tool_use.input().clone(),
1636 tool_use.id().to_string(),
1637 history_snapshot.clone(),
1638 );
1639
1640 // Track whether to execute and what input to use
1641 let mut tool_input = tool_use.input().clone();
1642 let mut should_execute = true;
1643 let mut block_reason = None;
1644
1645 // Execute all PreToolUse hooks
1646 if let Some(decision) =
1647 self.options.hooks().execute_pre_tool_use(pre_event).await
1648 {
1649 if !decision.continue_execution() {
1650 // Hook blocked execution
1651 should_execute = false;
1652 block_reason = decision.reason().map(|s| s.to_string());
1653 } else if let Some(modified) = decision.modified_input() {
1654 // Hook modified the input
1655 tool_input = modified.clone();
1656 }
1657 }
1658
1659 // ============================================================
1660 // Execute tool (or create error result if blocked)
1661 // ============================================================
1662 let result = if should_execute {
1663 // Actually execute the tool
1664 match self
1665 .execute_tool_internal(tool_use.name(), tool_input.clone())
1666 .await
1667 {
1668 Ok(res) => res, // Success - use the result
1669 Err(e) => {
1670 // Tool execution failed - convert to JSON error
1671 // This allows the conversation to continue
1672 serde_json::json!({
1673 "error": e.to_string(),
1674 "tool": tool_use.name(),
1675 "id": tool_use.id()
1676 })
1677 }
1678 }
1679 } else {
1680 // Tool blocked by PreToolUse hook - create error result
1681 serde_json::json!({
1682 "error": "Tool execution blocked by hook",
1683 "reason": block_reason.unwrap_or_else(|| "No reason provided".to_string()),
1684 "tool": tool_use.name(),
1685 "id": tool_use.id()
1686 })
1687 };
1688
1689 // ============================================================
1690 // Execute PostToolUse hooks
1691 // ============================================================
1692 use crate::hooks::PostToolUseEvent;
1693 let post_event = PostToolUseEvent::new(
1694 tool_use.name().to_string(),
1695 tool_input,
1696 tool_use.id().to_string(),
1697 result.clone(),
1698 history_snapshot,
1699 );
1700
1701 let mut final_result = result;
1702 if let Some(decision) =
1703 self.options.hooks().execute_post_tool_use(post_event).await
1704 {
1705 // PostToolUse can modify the result
1706 // Note: Uses modified_input field (naming is historical)
1707 if let Some(modified) = decision.modified_input() {
1708 final_result = modified.clone();
1709 }
1710 }
1711
1712 // ============================================================
1713 // Add tool result to history
1714 // ============================================================
1715 // Tool results are added as user messages (per OpenAI convention)
1716 let tool_result = ToolResultBlock::new(tool_use.id(), final_result);
1717 let tool_result_msg =
1718 Message::user_with_blocks(vec![ContentBlock::ToolResult(tool_result)]);
1719 self.history.push(tool_result_msg);
1720 }
1721 }
1722
1723 // ========================================================================
1724 // STEP 7: Continue conversation to get next response
1725 // ========================================================================
1726 // Send empty string to continue - the history contains all context
1727 self.send("").await?;
1728
1729 // Loop continues to collect and process the next response
1730 // This will either be more tool calls or the final text answer
1731 }
1732 }
1733
1734 /// Receives the next content block from the current stream.
1735 ///
1736 /// This is the primary method for consuming responses from the model. It works
1737 /// differently depending on the operating mode:
1738 ///
1739 /// ## Manual Mode (default)
1740 ///
1741 /// Streams blocks directly from the API response as they arrive. You receive:
1742 /// - `TextBlock`: Incremental text from the model
1743 /// - `ToolUseBlock`: Requests to execute tools
1744 /// - Other block types as they're emitted
1745 ///
1746 /// When you receive a `ToolUseBlock`, you must:
1747 /// 1. Execute the tool yourself
1748 /// 2. Call `add_tool_result()` with the result
1749 /// 3. Call `send("")` to continue the conversation
1750 ///
1751 /// ## Automatic Mode (`auto_execute_tools = true`)
1752 ///
1753 /// Transparently executes tools and only returns final text blocks. The first
1754 /// call to `receive()` triggers the auto-execution loop which:
1755 /// 1. Collects all blocks from the stream
1756 /// 2. Executes any tool calls automatically
1757 /// 3. Continues the conversation until reaching a text-only response
1758 /// 4. Buffers the final text blocks
1759 /// 5. Returns them one at a time on subsequent `receive()` calls
1760 ///
1761 /// # Returns
1762 ///
1763 /// - `Ok(Some(block))`: Successfully received a content block
1764 /// - `Ok(None)`: Stream ended normally or was interrupted
1765 /// - `Err(e)`: An error occurred during streaming or tool execution
1766 ///
1767 /// # Behavior Details
1768 ///
1769 /// ## Interruption
1770 ///
1771 /// Checks the interrupt flag on every call. If interrupted, immediately returns
1772 /// `Ok(None)` and clears the stream. The client can be reused after interruption.
1773 ///
1774 /// ## Stream Lifecycle
1775 ///
1776 /// 1. After `send()`, stream is active
1777 /// 2. Each `receive()` call yields one block
1778 /// 3. When stream ends, returns `Ok(None)`
1779 /// 4. Subsequent calls continue returning `Ok(None)` until next `send()`
1780 ///
1781 /// ## Auto-Execution Buffer
1782 ///
1783 /// In auto mode, blocks are buffered in memory. The buffer persists until
1784 /// fully consumed (index reaches length), at which point it's cleared.
1785 ///
1786 /// # State Changes
1787 ///
1788 /// - Advances stream position
1789 /// - In auto mode: May trigger entire execution loop and modify history
1790 /// - In manual mode: Only reads from stream, no history changes
1791 /// - Increments `auto_exec_index` when returning buffered blocks
1792 ///
1793 /// # Examples
1794 ///
1795 /// ## Manual Mode - Basic
1796 ///
1797 /// ```rust,no_run
1798 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1799 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1800 /// # let mut client = Client::new(AgentOptions::default())?;
1801 /// client.send("Hello!").await?;
1802 ///
1803 /// while let Some(block) = client.receive().await? {
1804 /// match block {
1805 /// ContentBlock::Text(text) => print!("{}", text.text),
1806 /// ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
1807 /// }
1808 /// }
1809 /// # Ok(())
1810 /// # }
1811 /// ```
1812 ///
1813 /// ## Manual Mode - With Tools
1814 ///
1815 /// ```rust,no_run
1816 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1817 /// # use serde_json::json;
1818 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1819 /// # let mut client = Client::new(AgentOptions::default())?;
1820 /// client.send("Use the calculator").await?;
1821 ///
1822 /// while let Some(block) = client.receive().await? {
1823 /// match block {
1824 /// ContentBlock::Text(text) => {
1825 /// println!("{}", text.text);
1826 /// }
1827 /// ContentBlock::ToolUse(tool_use) => {
1828 /// println!("Executing: {}", tool_use.name());
1829 ///
1830 /// // Execute tool manually
1831 /// let result = json!({"result": 42});
1832 ///
1833 /// // Add result and continue
1834 /// client.add_tool_result(tool_use.id(), result)?;
1835 /// client.send("").await?;
1836 /// }
1837 /// ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
1838 /// }
1839 /// }
1840 /// # Ok(())
1841 /// # }
1842 /// ```
1843 ///
1844 /// ## Auto Mode
1845 ///
1846 /// ```rust,no_run
1847 /// # use open_agent::{Client, AgentOptions, ContentBlock, Tool};
1848 /// # use serde_json::json;
1849 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1850 /// let mut client = Client::new(AgentOptions::builder()
1851 /// .auto_execute_tools(true)
1852 /// .build()?)?;
1853 ///
1854 /// client.send("Calculate 2+2").await?;
1855 ///
1856 /// // Tools execute automatically - you only get final text
1857 /// while let Some(block) = client.receive().await? {
1858 /// if let ContentBlock::Text(text) = block {
1859 /// println!("{}", text.text);
1860 /// }
1861 /// }
1862 /// # Ok(())
1863 /// # }
1864 /// ```
1865 ///
1866 /// ## With Error Handling
1867 ///
1868 /// ```rust,no_run
1869 /// # use open_agent::{Client, AgentOptions};
1870 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1871 /// # let mut client = Client::new(AgentOptions::default())?;
1872 /// client.send("Hello").await?;
1873 ///
1874 /// loop {
1875 /// match client.receive().await {
1876 /// Ok(Some(block)) => {
1877 /// // Process block
1878 /// }
1879 /// Ok(None) => {
1880 /// // Stream ended
1881 /// break;
1882 /// }
1883 /// Err(e) => {
1884 /// eprintln!("Error: {}", e);
1885 /// break;
1886 /// }
1887 /// }
1888 /// }
1889 /// # Ok(())
1890 /// # }
1891 /// ```
1892 ///
1893 /// Sends a pre-built message to the AI model.
1894 ///
1895 /// This method allows sending messages with images or custom content blocks
1896 /// that cannot be expressed as simple text prompts. Use the `Message` helper
1897 /// methods like [`user_with_image()`](Message::user_with_image),
1898 /// [`user_with_image_detail()`](Message::user_with_image_detail), or
1899 /// [`user_with_base64_image()`](Message::user_with_base64_image) to create
1900 /// messages with multimodal content.
1901 ///
1902 /// Unlike [`send()`](Client::send), this method:
1903 /// - Accepts pre-built `Message` objects instead of text prompts
1904 /// - Bypasses `UserPromptSubmit` hooks (since message is already constructed)
1905 /// - Enables multimodal interactions (text + images)
1906 ///
1907 /// After calling this method, use [`receive()`](Client::receive) to get the
1908 /// response content blocks.
1909 ///
1910 /// # Arguments
1911 ///
1912 /// * `message` - A pre-built message (typically created with `Message::user_with_image()` or similar helpers)
1913 ///
1914 /// # Errors
1915 ///
1916 /// Returns `Error` if:
1917 /// - Network request fails
1918 /// - Server returns an error
1919 /// - Response cannot be parsed
1920 /// - Request is interrupted via [`interrupt()`](Client::interrupt)
1921 ///
1922 /// # Example
1923 ///
1924 /// ```rust,no_run
1925 /// use open_agent::{Client, AgentOptions, Message, ImageDetail};
1926 ///
1927 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1928 /// let options = AgentOptions::builder()
1929 /// .model("gpt-4-vision-preview")
1930 /// .base_url("http://localhost:1234/v1")
1931 /// .build()?;
1932 ///
1933 /// let mut client = Client::new(options)?;
1934 ///
1935 /// // Send a message with an image
1936 /// let msg = Message::user_with_image(
1937 /// "What's in this image?",
1938 /// "https://example.com/photo.jpg"
1939 /// )?;
1940 /// client.send_message(msg).await?;
1941 ///
1942 /// // Receive the response
1943 /// while let Some(block) = client.receive().await? {
1944 /// // Process response blocks
1945 /// }
1946 /// # Ok(())
1947 /// # }
1948 /// ```
1949 pub async fn send_message(&mut self, message: Message) -> Result<()> {
1950 // Reset interrupt flag for new query
1951 // This allows the client to be reused after a previous interruption
1952 // Uses SeqCst ordering to ensure visibility across all threads
1953 self.interrupted.store(false, Ordering::SeqCst);
1954
1955 // Discard any leftover manual-mode blocks from an abandoned stream.
1956 self.manual_receive_buffer.clear();
1957 self.current_stream = None;
1958
1959 // Note: We do NOT run UserPromptSubmit hooks here because:
1960 // 1. The message is already fully constructed
1961 // 2. Hooks expect string prompts, not complex Message objects
1962 // 3. For multimodal messages, there's no single "prompt" to modify
1963
1964 // Add message to history BEFORE sending request
1965 // This ensures history consistency even if request fails
1966 self.history.push(message);
1967
1968 // The rest of the logic is identical to send() - build and execute request
1969 // Build messages array for API request
1970 // This includes system prompt + full conversation history
1971 let mut messages = Vec::new();
1972
1973 // Add system prompt as first message if configured
1974 // System prompts are added fresh for each request (not from history)
1975 if !self.options.system_prompt().is_empty() {
1976 messages.push(OpenAIMessage {
1977 role: "system".to_string(),
1978 content: Some(OpenAIContent::Text(
1979 self.options.system_prompt().to_string(),
1980 )),
1981 tool_calls: None,
1982 tool_call_id: None,
1983 });
1984 }
1985
1986 // Convert conversation history to OpenAI message format
1987 // This includes user prompts, assistant responses, and tool results
1988 for msg in &self.history {
1989 // Separate blocks by type to determine message structure
1990 let mut text_blocks = Vec::new();
1991 let mut image_blocks = Vec::new();
1992 let mut tool_use_blocks = Vec::new();
1993 let mut tool_result_blocks = Vec::new();
1994
1995 for block in &msg.content {
1996 match block {
1997 ContentBlock::Text(text) => text_blocks.push(text),
1998 ContentBlock::Image(image) => image_blocks.push(image),
1999 ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
2000 ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
2001 }
2002 }
2003
2004 // Handle different message types based on content blocks
2005 // Case 1: Message contains tool results (should be separate tool messages)
2006 if !tool_result_blocks.is_empty() {
2007 for tool_result in tool_result_blocks {
2008 // Serialize the tool result content as JSON string
2009 let content =
2010 serde_json::to_string(tool_result.content()).unwrap_or_else(|e| {
2011 format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
2012 });
2013
2014 messages.push(OpenAIMessage {
2015 role: "tool".to_string(),
2016 content: Some(OpenAIContent::Text(content)),
2017 tool_calls: None,
2018 tool_call_id: Some(tool_result.tool_use_id().to_string()),
2019 });
2020 }
2021 }
2022 // Case 2: Message contains tool use blocks (assistant with tool calls)
2023 else if !tool_use_blocks.is_empty() {
2024 // Build tool_calls array
2025 let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
2026 .iter()
2027 .map(|tool_use| {
2028 // Serialize the input as a JSON string (OpenAI API requirement)
2029 let arguments = serde_json::to_string(tool_use.input())
2030 .unwrap_or_else(|_| "{}".to_string());
2031
2032 OpenAIToolCall {
2033 id: tool_use.id().to_string(),
2034 call_type: "function".to_string(),
2035 function: OpenAIFunction {
2036 name: tool_use.name().to_string(),
2037 arguments,
2038 },
2039 }
2040 })
2041 .collect();
2042
2043 // Extract any text content (some models include reasoning before tool calls)
2044 // Note: OpenAI API requires content field even if empty when tool_calls present
2045 let content = if !text_blocks.is_empty() {
2046 let text = text_blocks
2047 .iter()
2048 .map(|t| t.text.as_str())
2049 .collect::<Vec<_>>()
2050 .join("\n");
2051 Some(OpenAIContent::Text(text))
2052 } else {
2053 // Empty string satisfies OpenAI API schema (content is required)
2054 Some(OpenAIContent::Text(String::new()))
2055 };
2056
2057 messages.push(OpenAIMessage {
2058 role: "assistant".to_string(),
2059 content,
2060 tool_calls: Some(tool_calls),
2061 tool_call_id: None,
2062 });
2063 }
2064 // Case 3: Message contains images (use OpenAIContent::Parts)
2065 else if !image_blocks.is_empty() {
2066 // Log debug info about images being serialized
2067 log::debug!(
2068 "Serializing message with {} image(s) for {:?} role",
2069 image_blocks.len(),
2070 msg.role
2071 );
2072
2073 // Build content parts array preserving original order
2074 let mut content_parts = Vec::new();
2075
2076 // Re-iterate through content blocks to maintain order
2077 for block in &msg.content {
2078 match block {
2079 ContentBlock::Text(text) => {
2080 content_parts.push(OpenAIContentPart::text(&text.text));
2081 }
2082 ContentBlock::Image(image) => {
2083 // Log image details (truncate URL for privacy)
2084 let url_display = if image.url().len() > 100 {
2085 format!("{}... ({} chars)", &image.url()[..100], image.url().len())
2086 } else {
2087 image.url().to_string()
2088 };
2089 let detail_str = match image.detail() {
2090 crate::types::ImageDetail::Low => "low",
2091 crate::types::ImageDetail::High => "high",
2092 crate::types::ImageDetail::Auto => "auto",
2093 };
2094 log::debug!(" - Image: {} (detail: {})", url_display, detail_str);
2095
2096 content_parts.push(OpenAIContentPart::from_image(image));
2097 }
2098 ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
2099 }
2100 }
2101
2102 // Defensive check: content_parts should never be empty at this point
2103 // If it is, it indicates a logic error (e.g., all blocks were filtered out)
2104 if content_parts.is_empty() {
2105 return Err(Error::other(
2106 "Internal error: Message with images produced empty content array",
2107 ));
2108 }
2109
2110 let role_str = match msg.role {
2111 MessageRole::System => "system",
2112 MessageRole::User => "user",
2113 MessageRole::Assistant => "assistant",
2114 MessageRole::Tool => "tool",
2115 };
2116
2117 messages.push(OpenAIMessage {
2118 role: role_str.to_string(),
2119 content: Some(OpenAIContent::Parts(content_parts)),
2120 tool_calls: None,
2121 tool_call_id: None,
2122 });
2123 }
2124 // Case 4: Message contains only text (normal message, backward compatible)
2125 else {
2126 let content = text_blocks
2127 .iter()
2128 .map(|t| t.text.as_str())
2129 .collect::<Vec<_>>()
2130 .join("\n");
2131
2132 let role_str = match msg.role {
2133 MessageRole::System => "system",
2134 MessageRole::User => "user",
2135 MessageRole::Assistant => "assistant",
2136 MessageRole::Tool => "tool",
2137 };
2138
2139 messages.push(OpenAIMessage {
2140 role: role_str.to_string(),
2141 content: Some(OpenAIContent::Text(content)),
2142 tool_calls: None,
2143 tool_call_id: None,
2144 });
2145 }
2146 }
2147
2148 // Convert tools to OpenAI format if any are registered
2149 let tools = if !self.options.tools().is_empty() {
2150 Some(
2151 self.options
2152 .tools()
2153 .iter()
2154 .map(|t| t.to_openai_format())
2155 .collect(),
2156 )
2157 } else {
2158 None
2159 };
2160
2161 // Build the OpenAI-compatible request payload
2162 let request = OpenAIRequest {
2163 model: self.options.model().to_string(),
2164 messages,
2165 stream: true,
2166 max_tokens: self.options.max_tokens(),
2167 temperature: Some(self.options.temperature()),
2168 tools,
2169 };
2170
2171 // Make HTTP POST request to chat completions endpoint
2172 let url = format!("{}/chat/completions", self.options.base_url());
2173 let response = self
2174 .http_client
2175 .post(&url)
2176 .header(
2177 "Authorization",
2178 format!("Bearer {}", self.options.api_key()),
2179 )
2180 .header("Content-Type", "application/json")
2181 .json(&request)
2182 .send()
2183 .await
2184 .map_err(Error::Http)?;
2185
2186 // Check for HTTP-level errors
2187 if !response.status().is_success() {
2188 let status = response.status();
2189 let body = response.text().await.unwrap_or_else(|e| {
2190 eprintln!("WARNING: Failed to read error response body: {}", e);
2191 "Unknown error (failed to read response body)".to_string()
2192 });
2193 return Err(Error::api(format!("API error {}: {}", status, body)));
2194 }
2195
2196 // Parse Server-Sent Events stream
2197 let sse_stream = parse_sse_stream(response);
2198
2199 // Aggregate SSE chunks into complete content blocks
2200 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
2201 let result = match chunk_result {
2202 Ok(chunk) => match aggregator.process_chunk(chunk) {
2203 Ok(blocks) => {
2204 if blocks.is_empty() {
2205 Some(None) // Partial chunk
2206 } else {
2207 Some(Some(Ok(blocks))) // Complete blocks
2208 }
2209 }
2210 Err(e) => Some(Some(Err(e))),
2211 },
2212 Err(e) => Some(Some(Err(e))),
2213 };
2214 futures::future::ready(result)
2215 });
2216
2217 // Flatten nested Options and filter out None values
2218 let stream = stream.filter_map(futures::future::ready);
2219
2220 // Flatten Vec<ContentBlock> into individual blocks
2221 let stream = stream.flat_map(|result| {
2222 futures::stream::iter(match result {
2223 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
2224 Err(e) => vec![Err(e)],
2225 })
2226 });
2227
2228 // Store the content stream for receive() to consume
2229 self.current_stream = Some(Box::pin(stream));
2230
2231 Ok(())
2232 }
2233
2234 pub async fn receive(&mut self) -> Result<Option<ContentBlock>> {
2235 // ========================================================================
2236 // AUTO-EXECUTION MODE
2237 // ========================================================================
2238 if self.options.auto_execute_tools() {
2239 // Check if we have buffered blocks to return
2240 // In auto mode, all final text blocks are buffered and returned one at a time
2241 if self.auto_exec_index < self.auto_exec_buffer.len() {
2242 // Return next buffered block
2243 let block = self.auto_exec_buffer[self.auto_exec_index].clone();
2244 self.auto_exec_index += 1;
2245 return Ok(Some(block));
2246 }
2247
2248 // No buffered blocks - need to run auto-execution loop
2249 // This only happens on the first receive() call after send()
2250 if self.auto_exec_buffer.is_empty() {
2251 match self.auto_execute_loop().await {
2252 Ok(blocks) => {
2253 // Buffer all final text blocks
2254 self.auto_exec_buffer = blocks;
2255 self.auto_exec_index = 0;
2256
2257 // If no blocks, return None (empty response)
2258 if self.auto_exec_buffer.is_empty() {
2259 return Ok(None);
2260 }
2261
2262 // Return first buffered block
2263 let block = self.auto_exec_buffer[0].clone();
2264 self.auto_exec_index = 1;
2265 return Ok(Some(block));
2266 }
2267 Err(e) => return Err(e),
2268 }
2269 }
2270
2271 // Buffer exhausted - return None
2272 Ok(None)
2273 } else {
2274 // ====================================================================
2275 // MANUAL MODE
2276 // ====================================================================
2277 // Stream blocks to caller while accumulating them so we can add
2278 // the complete assistant message to history when the stream ends.
2279 match self.receive_one().await {
2280 Err(e) => {
2281 // Stream error — discard partial output so a retry
2282 // doesn't flush truncated blocks into history.
2283 self.manual_receive_buffer.clear();
2284 Err(e)
2285 }
2286 Ok(Some(block)) => {
2287 self.manual_receive_buffer.push(block.clone());
2288 Ok(Some(block))
2289 }
2290 Ok(None) => {
2291 if self.interrupted.load(Ordering::SeqCst) && self.current_stream.is_some() {
2292 // Interrupted a live stream — discard partial output.
2293 // current_stream is still Some because receive_one()
2294 // only clears it on natural EOF, not on interrupt.
2295 self.current_stream = None;
2296 self.manual_receive_buffer.clear();
2297 } else if !self.manual_receive_buffer.is_empty() {
2298 // Either natural EOF or interrupt after stream already
2299 // finished — commit the (complete) assistant message.
2300 let blocks = std::mem::take(&mut self.manual_receive_buffer);
2301 self.history.push(Message::assistant(blocks));
2302 }
2303 Ok(None)
2304 }
2305 }
2306 }
2307 }
2308
2309 /// Interrupts the current operation by setting the interrupt flag.
2310 ///
2311 /// This method provides a thread-safe way to cancel any in-progress streaming
2312 /// operation. The interrupt flag is checked by `receive()` before each block,
2313 /// allowing responsive cancellation.
2314 ///
2315 /// # Behavior
2316 ///
2317 /// - Sets the atomic interrupt flag to `true`
2318 /// - Next `receive()` call will return `Ok(None)` and clear the stream
2319 /// - Flag is automatically reset to `false` on next `send()` call
2320 /// - Safe to call from any thread (uses atomic operations)
2321 /// - Idempotent: calling multiple times has same effect as calling once
2322 /// - No-op if no operation is in progress
2323 ///
2324 /// # Thread Safety
2325 ///
2326 /// This method uses `Arc<AtomicBool>` internally, which can be safely shared
2327 /// across threads. You can clone the interrupt handle and use it from different
2328 /// threads or async tasks:
2329 ///
2330 /// ```rust,no_run
2331 /// # use open_agent::{Client, AgentOptions};
2332 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2333 /// let mut client = Client::new(AgentOptions::default())?;
2334 /// let interrupt_handle = client.interrupt_handle();
2335 ///
2336 /// // Use from another thread
2337 /// tokio::spawn(async move {
2338 /// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2339 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2340 /// });
2341 /// # Ok(())
2342 /// # }
2343 /// ```
2344 ///
2345 /// # State Changes
2346 ///
2347 /// - Sets `interrupted` flag to `true`
2348 /// - Does NOT modify stream, history, or other state directly
2349 /// - Effect takes place on next `receive()` call
2350 ///
2351 /// # Use Cases
2352 ///
2353 /// - User cancellation (e.g., stop button in UI)
2354 /// - Timeout enforcement
2355 /// - Resource cleanup
2356 /// - Emergency shutdown
2357 ///
2358 /// # Examples
2359 ///
2360 /// ## Basic Interruption
2361 ///
2362 /// ```rust,no_run
2363 /// use open_agent::{Client, AgentOptions};
2364 ///
2365 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2366 /// let mut client = Client::new(AgentOptions::default())?;
2367 ///
2368 /// client.send("Tell me a long story").await?;
2369 ///
2370 /// // Interrupt after receiving some blocks
2371 /// let mut count = 0;
2372 /// while let Some(block) = client.receive().await? {
2373 /// count += 1;
2374 /// if count >= 5 {
2375 /// client.interrupt();
2376 /// }
2377 /// }
2378 ///
2379 /// // Client is ready for new queries
2380 /// client.send("What's 2+2?").await?;
2381 /// # Ok(())
2382 /// # }
2383 /// ```
2384 ///
2385 /// ## With Timeout
2386 ///
2387 /// ```rust,no_run
2388 /// use open_agent::{Client, AgentOptions};
2389 /// use std::time::Duration;
2390 ///
2391 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2392 /// let mut client = Client::new(AgentOptions::default())?;
2393 ///
2394 /// client.send("Long request").await?;
2395 ///
2396 /// // Spawn timeout task
2397 /// let interrupt_handle = client.interrupt_handle();
2398 /// tokio::spawn(async move {
2399 /// tokio::time::sleep(Duration::from_secs(10)).await;
2400 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2401 /// });
2402 ///
2403 /// while let Some(_block) = client.receive().await? {
2404 /// // Process until timeout
2405 /// }
2406 /// # Ok(())
2407 /// # }
2408 /// ```
2409 pub fn interrupt(&self) {
2410 // Set interrupt flag using SeqCst for immediate visibility across all threads
2411 self.interrupted.store(true, Ordering::SeqCst);
2412 }
2413
2414 /// Returns a clone of the interrupt handle for thread-safe cancellation.
2415 ///
2416 /// This method provides access to the shared `Arc<AtomicBool>` interrupt flag,
2417 /// allowing it to be used from other threads or async tasks to signal cancellation.
2418 ///
2419 /// # Returns
2420 ///
2421 /// A cloned `Arc<AtomicBool>` that can be used to interrupt operations from any thread.
2422 ///
2423 /// # Examples
2424 ///
2425 /// ```rust,no_run
2426 /// # use open_agent::{Client, AgentOptions};
2427 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2428 /// let mut client = Client::new(AgentOptions::default())?;
2429 /// let interrupt_handle = client.interrupt_handle();
2430 ///
2431 /// // Use from another thread
2432 /// tokio::spawn(async move {
2433 /// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2434 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2435 /// });
2436 /// # Ok(())
2437 /// # }
2438 /// ```
2439 pub fn interrupt_handle(&self) -> Arc<AtomicBool> {
2440 self.interrupted.clone()
2441 }
2442
2443 /// Returns a reference to the conversation history.
2444 ///
2445 /// The history contains all messages exchanged in the conversation, including:
2446 /// - User messages
2447 /// - Assistant messages (with text and tool use blocks)
2448 /// - Tool result messages
2449 ///
2450 /// # Returns
2451 ///
2452 /// A slice of `Message` objects in chronological order.
2453 ///
2454 /// # Use Cases
2455 ///
2456 /// - Inspecting conversation context
2457 /// - Debugging tool execution flow
2458 /// - Saving conversation state
2459 /// - Implementing custom history management
2460 ///
2461 /// # Examples
2462 ///
2463 /// ```rust
2464 /// # use open_agent::{Client, AgentOptions};
2465 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2466 /// let client = Client::new(AgentOptions::default())?;
2467 ///
2468 /// // Initially empty
2469 /// assert_eq!(client.history().len(), 0);
2470 /// # Ok(())
2471 /// # }
2472 /// ```
2473 pub fn history(&self) -> &[Message] {
2474 &self.history
2475 }
2476
2477 /// Returns a mutable reference to the conversation history.
2478 ///
2479 /// This allows you to modify the history directly for advanced use cases like:
2480 /// - Removing old messages to manage context length
2481 /// - Editing messages for retry scenarios
2482 /// - Injecting synthetic messages for testing
2483 ///
2484 /// # Warning
2485 ///
2486 /// Modifying history directly can lead to inconsistent conversation state if not
2487 /// done carefully. The SDK expects history to follow the proper message flow
2488 /// (user → assistant → tool results → assistant, etc.).
2489 ///
2490 /// # Examples
2491 ///
2492 /// ```rust
2493 /// # use open_agent::{Client, AgentOptions};
2494 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2495 /// let mut client = Client::new(AgentOptions::default())?;
2496 ///
2497 /// // Remove oldest messages to stay within context limit
2498 /// if client.history().len() > 50 {
2499 /// client.history_mut().drain(0..10);
2500 /// }
2501 /// # Ok(())
2502 /// # }
2503 /// ```
2504 pub fn history_mut(&mut self) -> &mut Vec<Message> {
2505 &mut self.history
2506 }
2507
2508 /// Returns a reference to the agent configuration options.
2509 ///
2510 /// Provides read-only access to the `AgentOptions` used to configure this client.
2511 ///
2512 /// # Use Cases
2513 ///
2514 /// - Inspecting current configuration
2515 /// - Debugging issues
2516 /// - Conditional logic based on settings
2517 ///
2518 /// # Examples
2519 ///
2520 /// ```rust
2521 /// # use open_agent::{Client, AgentOptions};
2522 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2523 /// let client = Client::new(AgentOptions::builder()
2524 /// .model("gpt-4")
2525 /// .base_url("http://localhost:1234/v1")
2526 /// .build()?)?;
2527 ///
2528 /// println!("Using model: {}", client.options().model());
2529 /// # Ok(())
2530 /// # }
2531 /// ```
2532 pub fn options(&self) -> &AgentOptions {
2533 &self.options
2534 }
2535
2536 /// Clears all conversation history.
2537 ///
2538 /// This resets the conversation to a blank slate while preserving the client
2539 /// configuration (tools, hooks, model, etc.). The next message will start a
2540 /// fresh conversation with no prior context.
2541 ///
2542 /// # State Changes
2543 ///
2544 /// - Clears `history` vector
2545 /// - Does NOT modify current stream, options, or other state
2546 ///
2547 /// # Use Cases
2548 ///
2549 /// - Starting a new conversation
2550 /// - Preventing context length issues
2551 /// - Clearing sensitive data
2552 /// - Implementing conversation sessions
2553 ///
2554 /// # Examples
2555 ///
2556 /// ```rust,no_run
2557 /// # use open_agent::{Client, AgentOptions, ContentBlock};
2558 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2559 /// let mut client = Client::new(AgentOptions::default())?;
2560 ///
2561 /// // First conversation
2562 /// client.send("Hello").await?;
2563 /// while let Some(_) = client.receive().await? {}
2564 ///
2565 /// // Clear and start fresh
2566 /// client.clear_history();
2567 ///
2568 /// // New conversation with no memory of previous
2569 /// client.send("Hello again").await?;
2570 /// # Ok(())
2571 /// # }
2572 /// ```
2573 pub fn clear_history(&mut self) {
2574 self.history.clear();
2575 self.manual_receive_buffer.clear();
2576 }
2577
2578 /// Adds a tool result to the conversation history for manual tool execution.
2579 ///
2580 /// This method is used exclusively in **manual mode** after receiving a `ToolUseBlock`.
2581 /// The workflow is:
2582 ///
2583 /// 1. `receive()` returns a `ToolUseBlock`
2584 /// 2. You execute the tool yourself
2585 /// 3. Call `add_tool_result()` with the tool's output
2586 /// 4. Call `send("")` to continue the conversation
2587 /// 5. The model receives the tool result and generates a response
2588 ///
2589 /// # Parameters
2590 ///
2591 /// - `tool_use_id`: The unique ID from the `ToolUseBlock` (must match exactly)
2592 /// - `content`: The tool's output as a JSON value
2593 ///
2594 /// # Behavior
2595 ///
2596 /// Creates a `ToolResultBlock` and adds it to conversation history as a tool message.
2597 /// This preserves the tool call/result pairing that the model needs to understand
2598 /// the conversation flow.
2599 ///
2600 /// # State Changes
2601 ///
2602 /// - Appends a tool message to `history`
2603 /// - Does NOT modify stream or trigger any requests
2604 ///
2605 /// # Important Notes
2606 ///
2607 /// - **Not used in auto mode**: Auto-execution handles tool results automatically
2608 /// - **ID must match**: The `tool_use_id` must match the ID from the `ToolUseBlock`
2609 /// - **No validation**: This method doesn't validate the result format
2610 /// - **Must call send()**: After adding result(s), call `send("")` to continue
2611 ///
2612 /// # Examples
2613 ///
2614 /// ## Basic Manual Tool Execution
2615 ///
2616 /// ```rust,no_run
2617 /// use open_agent::{Client, AgentOptions, ContentBlock};
2618 /// use serde_json::json;
2619 ///
2620 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2621 /// let mut client = Client::new(AgentOptions::default())?;
2622 /// client.send("Use the calculator").await?;
2623 ///
2624 /// while let Some(block) = client.receive().await? {
2625 /// match block {
2626 /// ContentBlock::ToolUse(tool_use) => {
2627 /// // Execute tool manually
2628 /// let result = json!({"result": 42});
2629 ///
2630 /// // Add result to history
2631 /// client.add_tool_result(tool_use.id(), result)?;
2632 ///
2633 /// // Continue conversation to get model's response
2634 /// client.send("").await?;
2635 /// }
2636 /// ContentBlock::Text(text) => {
2637 /// println!("{}", text.text);
2638 /// }
2639 /// ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
2640 /// }
2641 /// }
2642 /// # Ok(())
2643 /// # }
2644 /// ```
2645 ///
2646 /// ## Handling Tool Errors
2647 ///
2648 /// ```rust,no_run
2649 /// use open_agent::{Client, AgentOptions, ContentBlock};
2650 /// use serde_json::json;
2651 ///
2652 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2653 /// # let mut client = Client::new(AgentOptions::default())?;
2654 /// # client.send("test").await?;
2655 /// while let Some(block) = client.receive().await? {
2656 /// if let ContentBlock::ToolUse(tool_use) = block {
2657 /// // Try to execute tool
2658 /// let result = match execute_tool(tool_use.name(), tool_use.input()) {
2659 /// Ok(output) => output,
2660 /// Err(e) => json!({
2661 /// "error": e.to_string(),
2662 /// "tool": tool_use.name()
2663 /// })
2664 /// };
2665 ///
2666 /// client.add_tool_result(tool_use.id(), result)?;
2667 /// client.send("").await?;
2668 /// }
2669 /// }
2670 ///
2671 /// # fn execute_tool(name: &str, input: &serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
2672 /// # Ok(json!({}))
2673 /// # }
2674 /// # Ok(())
2675 /// # }
2676 /// ```
2677 ///
2678 /// ## Multiple Tool Calls
2679 ///
2680 /// ```rust,no_run
2681 /// use open_agent::{Client, AgentOptions, ContentBlock};
2682 /// use serde_json::json;
2683 ///
2684 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2685 /// # let mut client = Client::new(AgentOptions::default())?;
2686 /// client.send("Calculate 2+2 and 3+3").await?;
2687 ///
2688 /// let mut tool_calls = Vec::new();
2689 ///
2690 /// // Collect all tool calls
2691 /// while let Some(block) = client.receive().await? {
2692 /// if let ContentBlock::ToolUse(tool_use) = block {
2693 /// tool_calls.push(tool_use);
2694 /// }
2695 /// }
2696 ///
2697 /// // Execute and add results for all tools
2698 /// for tool_call in tool_calls {
2699 /// let result = json!({"result": 42}); // Execute tool
2700 /// client.add_tool_result(tool_call.id(), result)?;
2701 /// }
2702 ///
2703 /// // Continue conversation
2704 /// client.send("").await?;
2705 /// # Ok(())
2706 /// # }
2707 /// ```
2708 pub fn add_tool_result(&mut self, tool_use_id: &str, content: serde_json::Value) -> Result<()> {
2709 use crate::types::ToolResultBlock;
2710
2711 // Flush any buffered manual-mode blocks first so the assistant's
2712 // tool_calls message appears in history before this tool result.
2713 if !self.manual_receive_buffer.is_empty() {
2714 let blocks = std::mem::take(&mut self.manual_receive_buffer);
2715 self.history.push(Message::assistant(blocks));
2716 }
2717
2718 // Create a tool result block with the given ID and content
2719 let result_block = ToolResultBlock::new(tool_use_id, content);
2720
2721 // Add to history as a tool message
2722 // Note: ToolResultBlock is properly serialized in build_api_request()
2723 // as a separate message with role="tool" and tool_call_id set
2724 let serialized = serde_json::to_string(result_block.content())
2725 .map_err(|e| Error::config(format!("Failed to serialize tool result: {}", e)))?;
2726
2727 self.history.push(Message::new(
2728 MessageRole::Tool,
2729 vec![ContentBlock::Text(TextBlock::new(serialized))],
2730 ));
2731
2732 Ok(())
2733 }
2734
2735 /// Looks up a registered tool by name.
2736 ///
2737 /// This method provides access to the tool registry for manual execution scenarios.
2738 /// It searches the tools registered in `AgentOptions` and returns a reference to
2739 /// the matching tool if found.
2740 ///
2741 /// # Parameters
2742 ///
2743 /// - `name`: The tool name to search for (case-sensitive)
2744 ///
2745 /// # Returns
2746 ///
2747 /// - `Some(&Tool)`: Tool found
2748 /// - `None`: No tool with that name
2749 ///
2750 /// # Use Cases
2751 ///
2752 /// - Manual tool execution in response to `ToolUseBlock`
2753 /// - Validating tool availability before offering features
2754 /// - Inspecting tool metadata (name, description, schema)
2755 ///
2756 /// # Examples
2757 ///
2758 /// ## Execute Tool Manually
2759 ///
2760 /// ```rust,no_run
2761 /// use open_agent::{Client, AgentOptions, ContentBlock};
2762 ///
2763 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2764 /// # let mut client = Client::new(AgentOptions::default())?;
2765 /// # client.send("test").await?;
2766 /// while let Some(block) = client.receive().await? {
2767 /// if let ContentBlock::ToolUse(tool_use) = block {
2768 /// if let Some(tool) = client.get_tool(tool_use.name()) {
2769 /// // Execute the tool
2770 /// let result = tool.execute(tool_use.input().clone()).await?;
2771 /// client.add_tool_result(tool_use.id(), result)?;
2772 /// client.send("").await?;
2773 /// } else {
2774 /// println!("Unknown tool: {}", tool_use.name());
2775 /// }
2776 /// }
2777 /// }
2778 /// # Ok(())
2779 /// # }
2780 /// ```
2781 ///
2782 /// ## Check Tool Availability
2783 ///
2784 /// ```rust
2785 /// # use open_agent::{Client, AgentOptions};
2786 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2787 /// # let client = Client::new(AgentOptions::default())?;
2788 /// if client.get_tool("calculator").is_some() {
2789 /// println!("Calculator is available");
2790 /// }
2791 /// # Ok(())
2792 /// # }
2793 /// ```
2794 pub fn get_tool(&self, name: &str) -> Option<&crate::tools::Tool> {
2795 // Search registered tools by name
2796 self.options
2797 .tools()
2798 .iter()
2799 .find(|t| t.name() == name)
2800 .map(|t| t.as_ref())
2801 }
2802}
2803
2804#[cfg(test)]
2805mod tests {
2806 use super::*;
2807
2808 #[test]
2809 fn test_client_creation() {
2810 let options = AgentOptions::builder()
2811 .system_prompt("Test")
2812 .model("test-model")
2813 .base_url("http://localhost:1234/v1")
2814 .build()
2815 .unwrap();
2816
2817 let client = Client::new(options).expect("Should create client successfully");
2818 assert_eq!(client.history().len(), 0);
2819 }
2820
2821 #[test]
2822 fn test_client_new_returns_result() {
2823 // Test that Client::new() returns Result instead of panicking
2824 let options = AgentOptions::builder()
2825 .system_prompt("Test")
2826 .model("test-model")
2827 .base_url("http://localhost:1234/v1")
2828 .build()
2829 .unwrap();
2830
2831 // This should not panic - it should return Ok(client)
2832 let result = Client::new(options);
2833 assert!(result.is_ok(), "Client::new() should return Ok");
2834
2835 let client = result.unwrap();
2836 assert_eq!(client.history().len(), 0);
2837 }
2838
2839 #[test]
2840 fn test_interrupt_flag_initial_state() {
2841 let options = AgentOptions::builder()
2842 .system_prompt("Test")
2843 .model("test-model")
2844 .base_url("http://localhost:1234/v1")
2845 .build()
2846 .unwrap();
2847
2848 let client = Client::new(options).expect("Should create client successfully");
2849 // Initially not interrupted
2850 assert!(!client.interrupted.load(Ordering::SeqCst));
2851 }
2852
2853 #[test]
2854 fn test_interrupt_sets_flag() {
2855 let options = AgentOptions::builder()
2856 .system_prompt("Test")
2857 .model("test-model")
2858 .base_url("http://localhost:1234/v1")
2859 .build()
2860 .unwrap();
2861
2862 let client = Client::new(options).expect("Should create client successfully");
2863 client.interrupt();
2864 assert!(client.interrupted.load(Ordering::SeqCst));
2865 }
2866
2867 #[test]
2868 fn test_interrupt_idempotent() {
2869 let options = AgentOptions::builder()
2870 .system_prompt("Test")
2871 .model("test-model")
2872 .base_url("http://localhost:1234/v1")
2873 .build()
2874 .unwrap();
2875
2876 let client = Client::new(options).expect("Should create client successfully");
2877 client.interrupt();
2878 assert!(client.interrupted.load(Ordering::SeqCst));
2879
2880 // Call again - should still be interrupted
2881 client.interrupt();
2882 assert!(client.interrupted.load(Ordering::SeqCst));
2883 }
2884
2885 #[tokio::test]
2886 async fn test_receive_returns_none_when_interrupted() {
2887 let options = AgentOptions::builder()
2888 .system_prompt("Test")
2889 .model("test-model")
2890 .base_url("http://localhost:1234/v1")
2891 .build()
2892 .unwrap();
2893
2894 let mut client = Client::new(options).expect("Should create client successfully");
2895
2896 // Interrupt before receiving
2897 client.interrupt();
2898
2899 // NEW SIGNATURE: receive() should return Ok(None) when interrupted
2900 let result = client.receive().await;
2901 assert!(result.is_ok());
2902 assert!(result.unwrap().is_none());
2903 }
2904
2905 #[tokio::test]
2906 async fn test_receive_returns_ok_none_when_no_stream() {
2907 let options = AgentOptions::builder()
2908 .system_prompt("Test")
2909 .model("test-model")
2910 .base_url("http://localhost:1234/v1")
2911 .build()
2912 .unwrap();
2913
2914 let mut client = Client::new(options).expect("Should create client successfully");
2915
2916 // No stream started - receive() should return Ok(None)
2917 let result = client.receive().await;
2918 assert!(result.is_ok());
2919 assert!(result.unwrap().is_none());
2920 }
2921
2922 #[tokio::test]
2923 async fn test_receive_error_propagation() {
2924 // This test demonstrates that errors are wrapped in Err(), not Some(Err())
2925 // We'll verify this behavior when we have a mock stream that produces errors
2926 let options = AgentOptions::builder()
2927 .system_prompt("Test")
2928 .model("test-model")
2929 .base_url("http://localhost:1234/v1")
2930 .build()
2931 .unwrap();
2932
2933 let client = Client::new(options).expect("Should create client successfully");
2934
2935 // Signature check: receive() returns Result<Option<ContentBlock>>
2936 // This means we can use ? operator cleanly:
2937 // while let Some(block) = client.receive().await? { ... }
2938
2939 // Type assertion to ensure signature is correct
2940 let _: Result<Option<ContentBlock>> = std::future::ready(Ok(None)).await;
2941 drop(client);
2942 }
2943
2944 #[tokio::test]
2945 async fn test_manual_mode_adds_assistant_to_history() {
2946 // Issue #4: Manual mode receive() should add assistant messages to history
2947 let options = AgentOptions::builder()
2948 .system_prompt("Test")
2949 .model("test-model")
2950 .base_url("http://localhost:1234/v1")
2951 .build()
2952 .unwrap();
2953
2954 let mut client = Client::new(options).expect("Should create client successfully");
2955
2956 // Simulate a user message in history
2957 client
2958 .history
2959 .push(Message::user("What's the capital of France?"));
2960
2961 // Inject a fake stream with two text blocks
2962 let blocks = vec![
2963 Ok(ContentBlock::Text(TextBlock::new("Paris is"))),
2964 Ok(ContentBlock::Text(TextBlock::new(
2965 " the capital of France.",
2966 ))),
2967 ];
2968 let stream = futures::stream::iter(blocks);
2969 client.current_stream = Some(Box::pin(stream));
2970
2971 // Consume the stream via receive()
2972 let mut received = Vec::new();
2973 while let Some(block) = client.receive().await.unwrap() {
2974 received.push(block);
2975 }
2976
2977 // Should have received 2 blocks
2978 assert_eq!(received.len(), 2);
2979
2980 // History should now have 2 messages: user + assistant
2981 assert_eq!(client.history().len(), 2);
2982 assert_eq!(client.history()[0].role, MessageRole::User);
2983 assert_eq!(client.history()[1].role, MessageRole::Assistant);
2984
2985 // Assistant message should contain both text blocks
2986 assert_eq!(client.history()[1].content.len(), 2);
2987 }
2988
2989 #[tokio::test]
2990 async fn test_manual_mode_empty_stream_no_assistant_message() {
2991 // If the stream is empty, no assistant message should be added
2992 let options = AgentOptions::builder()
2993 .system_prompt("Test")
2994 .model("test-model")
2995 .base_url("http://localhost:1234/v1")
2996 .build()
2997 .unwrap();
2998
2999 let mut client = Client::new(options).expect("Should create client successfully");
3000
3001 // No stream set, receive returns None
3002 let result = client.receive().await.unwrap();
3003 assert!(result.is_none());
3004
3005 // History should remain empty — no spurious assistant message
3006 assert_eq!(client.history().len(), 0);
3007 }
3008
3009 #[tokio::test]
3010 async fn test_manual_mode_tool_call_flushed_on_send() {
3011 // P1: When receive() yields a ToolUseBlock and the caller then calls
3012 // send(""), the buffered assistant turn must be flushed to history
3013 // so the tool result has a matching tool_calls message.
3014 let options = AgentOptions::builder()
3015 .system_prompt("Test")
3016 .model("test-model")
3017 .base_url("http://localhost:1234/v1")
3018 .build()
3019 .unwrap();
3020
3021 let mut client = Client::new(options).expect("Should create client successfully");
3022
3023 // Simulate: user sends, receives a tool call block, stream ends mid-turn
3024 client.history.push(Message::user("Calculate 2+2"));
3025
3026 let tool_use =
3027 crate::types::ToolUseBlock::new("call_1", "calculator", serde_json::json!({"a": 2}));
3028 let blocks = vec![Ok(ContentBlock::ToolUse(tool_use))];
3029 client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3030
3031 // Caller consumes the tool use block
3032 let block = client.receive().await.unwrap().unwrap();
3033 assert!(matches!(block, ContentBlock::ToolUse(_)));
3034
3035 // Buffer should hold the block but history should NOT have assistant yet
3036 assert_eq!(client.manual_receive_buffer.len(), 1);
3037 assert_eq!(client.history().len(), 1); // only user message
3038
3039 // Stream ends — receive returns None, but buffer is NOT flushed yet
3040 // because the caller hasn't finished the tool flow
3041 // (receive_one returns None since stream is exhausted)
3042
3043 // Caller adds tool result — this should flush the buffer first,
3044 // then add the tool result, giving correct ordering.
3045 client
3046 .add_tool_result("call_1", serde_json::json!({"result": 4}))
3047 .unwrap();
3048
3049 // History should now be: user, assistant(tool_call), tool_result
3050 assert_eq!(client.history().len(), 3);
3051 assert_eq!(client.history()[0].role, MessageRole::User);
3052 assert_eq!(client.history()[1].role, MessageRole::Assistant);
3053 assert!(matches!(
3054 client.history()[1].content[0],
3055 ContentBlock::ToolUse(_)
3056 ));
3057 assert_eq!(client.history()[2].role, MessageRole::Tool);
3058 assert!(client.manual_receive_buffer.is_empty());
3059 }
3060
3061 #[tokio::test]
3062 async fn test_manual_mode_interrupt_discards_buffer() {
3063 // P2: Interrupted streams should NOT commit partial output to history
3064 let options = AgentOptions::builder()
3065 .system_prompt("Test")
3066 .model("test-model")
3067 .base_url("http://localhost:1234/v1")
3068 .build()
3069 .unwrap();
3070
3071 let mut client = Client::new(options).expect("Should create client successfully");
3072
3073 client.history.push(Message::user("Tell me a story"));
3074
3075 // Inject a stream, consume one block, then interrupt
3076 let blocks = vec![
3077 Ok(ContentBlock::Text(TextBlock::new("Once upon"))),
3078 Ok(ContentBlock::Text(TextBlock::new(" a time..."))),
3079 ];
3080 client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3081
3082 // Read one block
3083 let block = client.receive().await.unwrap().unwrap();
3084 assert!(matches!(block, ContentBlock::Text(_)));
3085 assert_eq!(client.manual_receive_buffer.len(), 1);
3086
3087 // Interrupt mid-stream
3088 client.interrupt();
3089
3090 // Next receive should return None and discard the buffer
3091 let result = client.receive().await.unwrap();
3092 assert!(result.is_none());
3093
3094 // History should only have the user message — no partial assistant
3095 assert_eq!(client.history().len(), 1);
3096 assert_eq!(client.history()[0].role, MessageRole::User);
3097 assert!(client.manual_receive_buffer.is_empty());
3098 }
3099
3100 #[tokio::test]
3101 async fn test_manual_mode_interrupt_after_eof_commits() {
3102 // P2 (round 2): If all blocks were delivered and the stream ended
3103 // naturally, an interrupt that fires before the next receive() should
3104 // still commit the complete response to history.
3105 let options = AgentOptions::builder()
3106 .system_prompt("Test")
3107 .model("test-model")
3108 .base_url("http://localhost:1234/v1")
3109 .build()
3110 .unwrap();
3111
3112 let mut client = Client::new(options).expect("Should create client successfully");
3113 client.history.push(Message::user("Hello"));
3114
3115 // Stream with one block
3116 let blocks = vec![Ok(ContentBlock::Text(TextBlock::new("Hi there!")))];
3117 client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3118
3119 // Consume the block
3120 let block = client.receive().await.unwrap().unwrap();
3121 assert!(matches!(block, ContentBlock::Text(_)));
3122
3123 // Consume EOF — stream ends normally, buffer committed
3124 let eof = client.receive().await.unwrap();
3125 assert!(eof.is_none());
3126
3127 // Verify current_stream is None (natural EOF)
3128 assert!(client.current_stream.is_none());
3129
3130 // History: user + assistant
3131 assert_eq!(client.history().len(), 2);
3132 assert_eq!(client.history()[1].role, MessageRole::Assistant);
3133 }
3134
3135 #[tokio::test]
3136 async fn test_manual_mode_send_discards_unfinished_stream() {
3137 // P1 (round 2): If the caller calls send() before the stream is
3138 // fully consumed, the partial buffer must be discarded, not committed.
3139 let options = AgentOptions::builder()
3140 .system_prompt("Test")
3141 .model("test-model")
3142 .base_url("http://localhost:1234/v1")
3143 .build()
3144 .unwrap();
3145
3146 let mut client = Client::new(options).expect("Should create client successfully");
3147 client.history.push(Message::user("Tell me everything"));
3148
3149 // Stream with many blocks — caller will only read the first
3150 let blocks = vec![
3151 Ok(ContentBlock::Text(TextBlock::new("First"))),
3152 Ok(ContentBlock::Text(TextBlock::new("Second"))),
3153 Ok(ContentBlock::Text(TextBlock::new("Third"))),
3154 ];
3155 client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3156
3157 // Read only the first block
3158 let block = client.receive().await.unwrap().unwrap();
3159 assert!(matches!(block, ContentBlock::Text(_)));
3160 assert_eq!(client.manual_receive_buffer.len(), 1);
3161
3162 // Verify buffer and stream are cleared — we can't call send()
3163 // (no server), so verify the discard logic directly.
3164 client.manual_receive_buffer.clear();
3165 client.current_stream = None;
3166
3167 // History should only have the user message — no partial assistant
3168 assert_eq!(client.history().len(), 1);
3169 }
3170
3171 #[tokio::test]
3172 async fn test_manual_mode_error_discards_buffer() {
3173 // P1: Stream errors should discard partial output, not leave it
3174 // to be flushed on the next send().
3175 let options = AgentOptions::builder()
3176 .system_prompt("Test")
3177 .model("test-model")
3178 .base_url("http://localhost:1234/v1")
3179 .build()
3180 .unwrap();
3181
3182 let mut client = Client::new(options).expect("Should create client successfully");
3183 client.history.push(Message::user("Hello"));
3184
3185 // Stream yields one block then errors
3186 let blocks: Vec<Result<ContentBlock>> = vec![
3187 Ok(ContentBlock::Text(TextBlock::new("Partial"))),
3188 Err(Error::stream("connection reset")),
3189 ];
3190 client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3191
3192 // First receive succeeds
3193 let block = client.receive().await.unwrap().unwrap();
3194 assert!(matches!(block, ContentBlock::Text(_)));
3195 assert_eq!(client.manual_receive_buffer.len(), 1);
3196
3197 // Second receive hits the error — buffer should be cleared
3198 let err = client.receive().await.unwrap_err();
3199 assert!(err.to_string().contains("connection reset"));
3200 assert!(client.manual_receive_buffer.is_empty());
3201
3202 // History should only have the user message
3203 assert_eq!(client.history().len(), 1);
3204 }
3205
3206 #[tokio::test]
3207 async fn test_clear_history_also_clears_manual_buffer() {
3208 // P2: clear_history() must also clear the manual buffer so a
3209 // "blank slate" conversation doesn't replay old assistant output.
3210 let options = AgentOptions::builder()
3211 .system_prompt("Test")
3212 .model("test-model")
3213 .base_url("http://localhost:1234/v1")
3214 .build()
3215 .unwrap();
3216
3217 let mut client = Client::new(options).expect("Should create client successfully");
3218 client.history.push(Message::user("Hello"));
3219
3220 // Inject stream, consume one block (buffer has content)
3221 let blocks = vec![Ok(ContentBlock::Text(TextBlock::new("Hi there")))];
3222 client.current_stream = Some(Box::pin(futures::stream::iter(blocks)));
3223 client.receive().await.unwrap();
3224 assert_eq!(client.manual_receive_buffer.len(), 1);
3225
3226 // Clear history — buffer must also be cleared
3227 client.clear_history();
3228 assert!(client.history().is_empty());
3229 assert!(client.manual_receive_buffer.is_empty());
3230 }
3231
3232 #[test]
3233 fn test_empty_content_parts_protection() {
3234 // Test for Issue #3 - Verify empty content_parts causes appropriate handling
3235 // This documents expected behavior: messages with images should have content
3236
3237 use crate::types::{ContentBlock, ImageBlock, Message, MessageRole};
3238
3239 // GIVEN: Message with an image
3240 let img = ImageBlock::from_url("https://example.com/test.jpg").expect("Valid URL");
3241
3242 let msg = Message::new(MessageRole::User, vec![ContentBlock::Image(img)]);
3243
3244 // WHEN: Building content_parts
3245 let mut content_parts = Vec::new();
3246 for block in &msg.content {
3247 match block {
3248 ContentBlock::Text(text) => {
3249 content_parts.push(crate::types::OpenAIContentPart::text(&text.text));
3250 }
3251 ContentBlock::Image(image) => {
3252 content_parts.push(crate::types::OpenAIContentPart::from_image(image));
3253 }
3254 ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
3255 }
3256 }
3257
3258 // THEN: content_parts should not be empty
3259 assert!(
3260 !content_parts.is_empty(),
3261 "Messages with images should produce non-empty content_parts"
3262 );
3263 }
3264}