open_agent/client.rs
1//! Client for streaming queries and multi-turn conversations
2//!
3//! This module provides the core streaming client implementation for the Open Agent SDK.
4//! It handles communication with OpenAI-compatible APIs, manages conversation history,
5//! and provides two modes of operation: manual and automatic tool execution.
6//!
7//! # Architecture Overview
8//!
9//! The SDK implements a **streaming-first architecture** where all responses from the model
10//! are received as a stream of content blocks. This design enables:
11//!
12//! - **Progressive rendering**: Display text as it arrives without waiting for completion
13//! - **Real-time tool execution**: Execute tools as they're requested by the model
14//! - **Interruption support**: Cancel operations mid-stream without corrupting state
15//! - **Memory efficiency**: Process large responses without buffering everything in memory
16//!
17//! ## Two Operating Modes
18//!
19//! ### 1. Manual Tool Execution Mode (default)
20//!
21//! In manual mode, the client streams content blocks directly to the caller. When the model
22//! requests a tool, the caller receives a `ToolUseBlock`, executes the tool, adds the result
23//! using `add_tool_result()`, and continues the conversation with another `send()` call.
24//!
25//! **Use cases**: Custom tool execution logic, interactive debugging, fine-grained control
26//!
27//! ### 2. Automatic Tool Execution Mode
28//!
29//! When `auto_execute_tools` is enabled, the client automatically executes tools and continues
30//! the conversation until receiving a text-only response. The caller only receives the final
31//! text blocks after all tool iterations complete.
32//!
33//! **Use cases**: Simple agentic workflows, automated task completion, batch processing
34//!
35//! ## Request Flow
36//!
37//! ```text
38//! User sends prompt
39//! │
40//! ├─> UserPromptSubmit hook executes (can modify/block prompt)
41//! │
42//! ├─> Prompt added to history
43//! │
44//! ├─> HTTP request to OpenAI-compatible API
45//! │
46//! ├─> Response streamed as Server-Sent Events (SSE)
47//! │
48//! ├─> SSE chunks aggregated into ContentBlocks
49//! │
50//! └─> Blocks emitted to caller (or buffered for auto-execution)
51//! ```
52//!
53//! ## Tool Execution Flow
54//!
55//! ### Manual Mode:
56//! ```text
57//! receive() → ToolUseBlock
58//! │
59//! ├─> Caller executes tool
60//! │
61//! ├─> Caller calls add_tool_result()
62//! │
63//! ├─> Caller calls send("") to continue
64//! │
65//! └─> receive() → TextBlock (model's response)
66//! ```
67//!
68//! ### Auto Mode:
69//! ```text
70//! receive() triggers auto-execution loop
71//! │
72//! ├─> Collect all blocks from stream
73//! │
74//! ├─> For each ToolUseBlock:
75//! │ ├─> PreToolUse hook executes (can modify/block)
76//! │ ├─> Tool executed via Tool.execute()
77//! │ ├─> PostToolUse hook executes (can modify result)
78//! │ └─> Result added to history
79//! │
80//! ├─> Continue conversation with send("")
81//! │
82//! ├─> Repeat until text-only response or max iterations
83//! │
84//! └─> Return text blocks one-by-one via receive()
85//! ```
86//!
87//! ## State Management
88//!
89//! The client maintains several pieces of state:
90//!
91//! - **history**: Full conversation history (`Vec<Message>`)
92//! - **current_stream**: Active SSE stream being consumed (`Option<ContentStream>`)
93//! - **interrupted**: Atomic flag for cancellation (`Arc<AtomicBool>`)
94//! - **auto_exec_buffer**: Buffered blocks for auto-execution mode (`Vec<ContentBlock>`)
95//! - **auto_exec_index**: Current position in buffer (usize)
96//!
97//! ## Interruption Mechanism
98//!
99//! The interrupt system uses `Arc<AtomicBool>` to enable safe, thread-safe cancellation:
100//!
101//! ```rust,no_run
102//! # use open_agent::{Client, AgentOptions};
103//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
104//! let mut client = Client::new(AgentOptions::default())?;
105//! let handle = client.interrupt_handle(); // Clone Arc for use in other threads
106//!
107//! // In another thread or async task:
108//! tokio::spawn(async move {
109//! tokio::time::sleep(std::time::Duration::from_secs(5)).await;
110//! handle.store(true, std::sync::atomic::Ordering::SeqCst);
111//! });
112//!
113//! client.send("Long request").await?;
114//! while let Some(block) = client.receive().await? {
115//! // Will stop when interrupted
116//! }
117//! # Ok(())
118//! # }
119//! ```
120//!
121//! ## Hook Integration
122//!
123//! Hooks provide extension points throughout the request lifecycle:
124//!
125//! - **UserPromptSubmit**: Called before sending user prompt (can modify or block)
126//! - **PreToolUse**: Called before executing each tool (can modify input or block execution)
127//! - **PostToolUse**: Called after tool execution (can modify result)
128//!
129//! Hooks are only invoked in specific scenarios and have access to conversation history.
130//!
131//! ## Error Handling
132//!
133//! Errors are propagated immediately and leave the client in a valid state:
134//!
135//! - **HTTP errors**: Network failures, timeouts, connection issues
136//! - **API errors**: Invalid model, authentication failures, rate limits
137//! - **Parse errors**: Malformed SSE responses, invalid JSON
138//! - **Tool errors**: Tool execution failures (converted to JSON error responses)
139//! - **Hook errors**: Hook execution failures or blocked operations
140//!
141//! After an error, the client remains usable for new requests.
142//!
143//! # Examples
144//!
145//! ## Simple Single-Turn Query
146//!
147//! ```rust,no_run
148//! use open_agent::{query, AgentOptions};
149//! use futures::StreamExt;
150//!
151//! #[tokio::main]
152//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
153//! let options = AgentOptions::builder()
154//! .model("gpt-4")
155//! .api_key("sk-...")
156//! .build()?;
157//!
158//! let mut stream = query("What is Rust?", &options).await?;
159//!
160//! while let Some(block) = stream.next().await {
161//! if let open_agent::ContentBlock::Text(text) = block? {
162//! print!("{}", text.text);
163//! }
164//! }
165//!
166//! Ok(())
167//! }
168//! ```
169//!
170//! ## Multi-Turn Conversation
171//!
172//! ```rust,no_run
173//! use open_agent::{Client, AgentOptions, ContentBlock};
174//! use futures::StreamExt;
175//!
176//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
177//! let mut client = Client::new(AgentOptions::builder()
178//! .model("gpt-4")
179//! .api_key("sk-...")
180//! .build()?)?;
181//!
182//! // First question
183//! client.send("What's the capital of France?").await?;
184//! while let Some(block) = client.receive().await? {
185//! if let ContentBlock::Text(text) = block {
186//! println!("{}", text.text);
187//! }
188//! }
189//!
190//! // Follow-up question (history is maintained)
191//! client.send("What's its population?").await?;
192//! while let Some(block) = client.receive().await? {
193//! if let ContentBlock::Text(text) = block {
194//! println!("{}", text.text);
195//! }
196//! }
197//! # Ok(())
198//! # }
199//! ```
200//!
201//! ## Manual Tool Execution
202//!
203//! ```rust,no_run
204//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
205//! use serde_json::json;
206//!
207//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208//! let calculator = Tool::new(
209//! "calculator",
210//! "Performs arithmetic operations",
211//! json!({"type": "object", "properties": {"operation": {"type": "string"}}}),
212//! |input| Box::pin(async move {
213//! // Custom execution logic
214//! Ok(json!({"result": 42}))
215//! })
216//! );
217//!
218//! let mut client = Client::new(AgentOptions::builder()
219//! .model("gpt-4")
220//! .api_key("sk-...")
221//! .tools(vec![calculator])
222//! .build()?)?;
223//!
224//! client.send("Calculate 2+2").await?;
225//!
226//! while let Some(block) = client.receive().await? {
227//! match block {
228//! ContentBlock::ToolUse(tool_use) => {
229//! println!("Model wants to use: {}", tool_use.name());
230//!
231//! // Execute tool manually
232//! let tool = client.get_tool(tool_use.name()).unwrap();
233//! let result = tool.execute(tool_use.input().clone()).await?;
234//!
235//! // Add result and continue
236//! client.add_tool_result(tool_use.id(), result)?;
237//! client.send("").await?;
238//! }
239//! ContentBlock::Text(text) => {
240//! println!("Response: {}", text.text);
241//! }
242//! ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
243//! }
244//! }
245//! # Ok(())
246//! # }
247//! ```
248//!
249//! ## Automatic Tool Execution
250//!
251//! ```rust,no_run
252//! use open_agent::{Client, AgentOptions, ContentBlock, Tool};
253//! use serde_json::json;
254//!
255//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256//! let calculator = Tool::new(
257//! "calculator",
258//! "Performs arithmetic operations",
259//! json!({"type": "object"}),
260//! |input| Box::pin(async move { Ok(json!({"result": 42})) })
261//! );
262//!
263//! let mut client = Client::new(AgentOptions::builder()
264//! .model("gpt-4")
265//! .api_key("sk-...")
266//! .tools(vec![calculator])
267//! .auto_execute_tools(true) // Enable auto-execution
268//! .max_tool_iterations(5) // Max 5 tool rounds
269//! .build()?)?;
270//!
271//! client.send("Calculate 2+2 and then multiply by 3").await?;
272//!
273//! // Tools are executed automatically - you only get final text response
274//! while let Some(block) = client.receive().await? {
275//! if let ContentBlock::Text(text) = block {
276//! println!("{}", text.text);
277//! }
278//! }
279//! # Ok(())
280//! # }
281//! ```
282//!
283//! ## With Hooks
284//!
285//! ```ignore
286//! use open_agent::{Client, AgentOptions, Hooks, HookDecision};
287//!
288//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
289//! let hooks = Hooks::new()
290//! .add_user_prompt_submit(|event| async move {
291//! // Block prompts containing certain words
292//! if event.prompt.contains("forbidden") {
293//! return Some(HookDecision::block("Forbidden word detected"));
294//! }
295//! Some(HookDecision::continue_())
296//! })
297//! .add_pre_tool_use(|event| async move {
298//! // Log all tool uses
299//! println!("Executing tool: {}", event.tool_name);
300//! Some(HookDecision::continue_())
301//! });
302//!
303//! let mut client = Client::new(AgentOptions::builder()
304//! .model("gpt-4")
305//! .base_url("http://localhost:1234/v1")
306//! .hooks(hooks)
307//! .build()?)?;
308//!
309//! // Hooks will be executed automatically
310//! client.send("Hello!").await?;
311//! # Ok(())
312//! # }
313//! ```
314
315use crate::types::{
316 AgentOptions, ContentBlock, Message, MessageRole, OpenAIContent, OpenAIContentPart,
317 OpenAIFunction, OpenAIMessage, OpenAIRequest, OpenAIToolCall, TextBlock,
318};
319use crate::utils::{ToolCallAggregator, parse_sse_stream};
320use crate::{Error, Result};
321use futures::stream::{Stream, StreamExt};
322use std::pin::Pin;
323use std::sync::Arc;
324use std::sync::atomic::{AtomicBool, Ordering};
325use std::time::Duration;
326
327/// A pinned, boxed stream of content blocks from the model.
328///
329/// This type alias represents an asynchronous stream that yields `ContentBlock` items.
330/// Each item is wrapped in a `Result` to handle potential errors during streaming.
331///
332/// The stream is:
333/// - **Pinned** (`Pin<Box<...>>`): Required for safe async operations and self-referential types
334/// - **Boxed**: Allows dynamic dispatch and hides the concrete stream implementation
335/// - **Send**: Can be safely transferred between threads
336///
337/// # Content Blocks
338///
339/// The stream can yield several types of content blocks:
340///
341/// - **TextBlock**: Incremental text responses from the model
342/// - **ToolUseBlock**: Requests to execute a tool with specific parameters
343/// - **ToolResultBlock**: Results from tool execution (in manual mode)
344///
345/// # Error Handling
346///
347/// Errors in the stream indicate issues like:
348/// - Network failures or timeouts
349/// - Malformed SSE events
350/// - JSON parsing errors
351/// - API errors from the model provider
352///
353/// When an error occurs, the stream typically terminates. It's the caller's responsibility
354/// to handle errors appropriately.
355///
356/// # Examples
357///
358/// ```rust,no_run
359/// use open_agent::{query, AgentOptions, ContentBlock};
360/// use futures::StreamExt;
361///
362/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
363/// let options = AgentOptions::builder()
364/// .model("gpt-4")
365/// .api_key("sk-...")
366/// .build()?;
367///
368/// let mut stream = query("Hello!", &options).await?;
369///
370/// while let Some(result) = stream.next().await {
371/// match result {
372/// Ok(ContentBlock::Text(text)) => print!("{}", text.text),
373/// Ok(_) => {}, // Other block types
374/// Err(e) => eprintln!("Stream error: {}", e),
375/// }
376/// }
377/// # Ok(())
378/// # }
379/// ```
380pub type ContentStream = Pin<Box<dyn Stream<Item = Result<ContentBlock>> + Send>>;
381
382/// Simple query function for single-turn interactions without conversation history.
383///
384/// This is a stateless convenience function for simple queries that don't require
385/// multi-turn conversations. It creates a temporary HTTP client, sends a single
386/// prompt, and returns a stream of content blocks.
387///
388/// For multi-turn conversations or more control over the interaction, use [`Client`] instead.
389///
390/// # Parameters
391///
392/// - `prompt`: The user's message to send to the model
393/// - `options`: Configuration including model, API key, tools, etc.
394///
395/// # Returns
396///
397/// Returns a `ContentStream` that yields content blocks as they arrive from the model.
398/// The stream must be polled to completion to receive all blocks.
399///
400/// # Behavior
401///
402/// 1. Creates a temporary HTTP client with configured timeout
403/// 2. Builds message array (system prompt + user prompt)
404/// 3. Converts tools to OpenAI format if provided
405/// 4. Makes HTTP POST request to `/chat/completions`
406/// 5. Parses Server-Sent Events (SSE) response stream
407/// 6. Aggregates chunks into complete content blocks
408/// 7. Returns stream that yields blocks as they complete
409///
410/// # Error Handling
411///
412/// This function can return errors for:
413/// - HTTP client creation failures
414/// - Network errors during the request
415/// - API errors (authentication, invalid model, rate limits, etc.)
416/// - SSE parsing errors
417/// - JSON deserialization errors
418///
419/// # Performance Notes
420///
421/// - Creates a new HTTP client for each call (consider using `Client` for repeated queries)
422/// - Timeout is configurable via `AgentOptions::timeout` (default: 120 seconds)
423/// - Streaming begins immediately; no buffering of the full response
424///
425/// # Examples
426///
427/// ## Basic Usage
428///
429/// ```rust,no_run
430/// use open_agent::{query, AgentOptions};
431/// use futures::StreamExt;
432///
433/// #[tokio::main]
434/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
435/// let options = AgentOptions::builder()
436/// .system_prompt("You are a helpful assistant")
437/// .model("gpt-4")
438/// .api_key("sk-...")
439/// .build()?;
440///
441/// let mut stream = query("What's the capital of France?", &options).await?;
442///
443/// while let Some(block) = stream.next().await {
444/// match block? {
445/// open_agent::ContentBlock::Text(text) => {
446/// print!("{}", text.text);
447/// }
448/// open_agent::ContentBlock::ToolUse(_)
449/// | open_agent::ContentBlock::ToolResult(_)
450/// | open_agent::ContentBlock::Image(_) => {}
451/// }
452/// }
453///
454/// Ok(())
455/// }
456/// ```
457///
458/// ## With Tools
459///
460/// ```rust,no_run
461/// use open_agent::{query, AgentOptions, Tool, ContentBlock};
462/// use futures::StreamExt;
463/// use serde_json::json;
464///
465/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
466/// let calculator = Tool::new(
467/// "calculator",
468/// "Performs calculations",
469/// json!({"type": "object"}),
470/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
471/// );
472///
473/// let options = AgentOptions::builder()
474/// .model("gpt-4")
475/// .api_key("sk-...")
476/// .tools(vec![calculator])
477/// .build()?;
478///
479/// let mut stream = query("Calculate 2+2", &options).await?;
480///
481/// while let Some(block) = stream.next().await {
482/// match block? {
483/// ContentBlock::ToolUse(tool_use) => {
484/// println!("Model wants to use: {}", tool_use.name());
485/// // Note: You'll need to manually execute tools and continue
486/// // the conversation. For automatic execution, use Client.
487/// }
488/// ContentBlock::Text(text) => print!("{}", text.text),
489/// ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
490/// }
491/// }
492/// # Ok(())
493/// # }
494/// ```
495///
496/// ## Error Handling
497///
498/// ```rust,no_run
499/// use open_agent::{query, AgentOptions};
500/// use futures::StreamExt;
501///
502/// # async fn example() {
503/// let options = AgentOptions::builder()
504/// .model("gpt-4")
505/// .api_key("invalid-key")
506/// .build()
507/// .unwrap();
508///
509/// match query("Hello", &options).await {
510/// Ok(mut stream) => {
511/// while let Some(result) = stream.next().await {
512/// match result {
513/// Ok(block) => println!("Block: {:?}", block),
514/// Err(e) => {
515/// eprintln!("Stream error: {}", e);
516/// break;
517/// }
518/// }
519/// }
520/// }
521/// Err(e) => eprintln!("Query failed: {}", e),
522/// }
523/// # }
524/// ```
525pub async fn query(prompt: &str, options: &AgentOptions) -> Result<ContentStream> {
526 // Create HTTP client with configured timeout
527 // The timeout applies to the entire request, not individual chunks
528 let client = reqwest::Client::builder()
529 .timeout(Duration::from_secs(options.timeout()))
530 .build()
531 .map_err(Error::Http)?;
532
533 // Build messages array for the API request
534 // OpenAI format expects an array of message objects with role and content
535 let mut messages = Vec::new();
536
537 // Add system prompt if provided
538 // System prompts set the assistant's behavior and context
539 if !options.system_prompt().is_empty() {
540 messages.push(OpenAIMessage {
541 role: "system".to_string(),
542 content: Some(OpenAIContent::Text(options.system_prompt().to_string())),
543 tool_calls: None,
544 tool_call_id: None,
545 });
546 }
547
548 // Add user prompt
549 // This is the actual query from the user
550 messages.push(OpenAIMessage {
551 role: "user".to_string(),
552 content: Some(OpenAIContent::Text(prompt.to_string())),
553 tool_calls: None,
554 tool_call_id: None,
555 });
556
557 // Convert tools to OpenAI format if any are provided
558 // Tools are described using JSON Schema for parameter validation
559 let tools = if !options.tools().is_empty() {
560 Some(
561 options
562 .tools()
563 .iter()
564 .map(|t| t.to_openai_format())
565 .collect(),
566 )
567 } else {
568 None
569 };
570
571 // Build the OpenAI-compatible request payload
572 // stream=true enables Server-Sent Events for incremental responses
573 let request = OpenAIRequest {
574 model: options.model().to_string(),
575 messages,
576 stream: true, // Critical: enables SSE streaming
577 max_tokens: options.max_tokens(),
578 temperature: Some(options.temperature()),
579 tools,
580 };
581
582 // Make HTTP POST request to the chat completions endpoint
583 let url = format!("{}/chat/completions", options.base_url());
584 let response = client
585 .post(&url)
586 .header("Authorization", format!("Bearer {}", options.api_key()))
587 .header("Content-Type", "application/json")
588 .json(&request)
589 .send()
590 .await
591 .map_err(Error::Http)?;
592
593 // Check for HTTP-level errors before processing the stream
594 // This catches authentication failures, rate limits, invalid models, etc.
595 if !response.status().is_success() {
596 let status = response.status();
597 let body = response.text().await.unwrap_or_else(|e| {
598 eprintln!("WARNING: Failed to read error response body: {}", e);
599 "Unknown error (failed to read response body)".to_string()
600 });
601 return Err(Error::api(format!("API error {}: {}", status, body)));
602 }
603
604 // Parse the Server-Sent Events (SSE) stream
605 // The response body is a stream of "data: {...}" events
606 let sse_stream = parse_sse_stream(response);
607
608 // Aggregate SSE chunks into complete content blocks
609 // ToolCallAggregator handles partial JSON and assembles complete tool calls
610 // The scan() combinator maintains state across stream items
611 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
612 let result = match chunk_result {
613 Ok(chunk) => match aggregator.process_chunk(chunk) {
614 Ok(blocks) => {
615 if blocks.is_empty() {
616 Some(None) // Intermediate chunk, continue streaming
617 } else {
618 Some(Some(Ok(blocks))) // Complete block(s) ready
619 }
620 }
621 Err(e) => Some(Some(Err(e))), // Propagate processing error
622 },
623 Err(e) => Some(Some(Err(e))), // Propagate stream error
624 };
625 futures::future::ready(result)
626 });
627
628 // Flatten the stream to emit individual blocks
629 // filter_map removes None values (incomplete chunks)
630 // flat_map expands Vec<ContentBlock> into individual items
631 let flattened = stream
632 .filter_map(|item| async move { item })
633 .flat_map(|result| {
634 futures::stream::iter(match result {
635 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
636 Err(e) => vec![Err(e)],
637 })
638 });
639
640 // Pin and box the stream for type erasure and safe async usage
641 Ok(Box::pin(flattened))
642}
643
644/// Stateful client for multi-turn conversations with automatic history management.
645///
646/// The `Client` is the primary interface for building conversational AI applications.
647/// It maintains conversation history, manages streaming responses, and provides two
648/// modes of operation: manual and automatic tool execution.
649///
650/// # State Management
651///
652/// The client maintains several pieces of state that persist across multiple turns:
653///
654/// - **Conversation History**: Complete record of all messages exchanged
655/// - **Active Stream**: Currently active SSE stream being consumed
656/// - **Interrupt Flag**: Thread-safe cancellation signal
657/// - **Auto-Execution Buffer**: Cached blocks for auto-execution mode
658///
659/// # Operating Modes
660///
661/// ## Manual Mode (default)
662///
663/// In manual mode, the client streams blocks directly to the caller. When the model
664/// requests a tool, you receive a `ToolUseBlock`, execute the tool yourself, add the
665/// result with `add_tool_result()`, and continue the conversation.
666///
667/// **Advantages**:
668/// - Full control over tool execution
669/// - Custom error handling per tool
670/// - Ability to modify tool inputs/outputs
671/// - Interactive debugging capabilities
672///
673/// ## Automatic Mode (`auto_execute_tools = true`)
674///
675/// In automatic mode, the client executes tools transparently and only returns the
676/// final text response after all tool iterations complete.
677///
678/// **Advantages**:
679/// - Simpler API for common use cases
680/// - Built-in retry logic via hooks
681/// - Automatic conversation continuation
682/// - Configurable iteration limits
683///
684/// # Thread Safety
685///
686/// The client is NOT thread-safe for concurrent use. However, the interrupt mechanism
687/// uses `Arc<AtomicBool>` which can be safely shared across threads to signal cancellation.
688///
689/// # Memory Management
690///
691/// - History grows unbounded by default (consider clearing periodically)
692/// - Streams are consumed lazily (low memory footprint during streaming)
693/// - Auto-execution buffers entire response (higher memory in auto mode)
694///
695/// # Examples
696///
697/// ## Basic Multi-Turn Conversation
698///
699/// ```rust,no_run
700/// use open_agent::{Client, AgentOptions, ContentBlock};
701///
702/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
703/// let mut client = Client::new(AgentOptions::builder()
704/// .model("gpt-4")
705/// .api_key("sk-...")
706/// .build()?)?;
707///
708/// // First question
709/// client.send("What's the capital of France?").await?;
710/// while let Some(block) = client.receive().await? {
711/// if let ContentBlock::Text(text) = block {
712/// println!("{}", text.text); // "Paris is the capital of France."
713/// }
714/// }
715///
716/// // Follow-up question - history is automatically maintained
717/// client.send("What's its population?").await?;
718/// while let Some(block) = client.receive().await? {
719/// if let ContentBlock::Text(text) = block {
720/// println!("{}", text.text); // "Paris has approximately 2.2 million people."
721/// }
722/// }
723/// # Ok(())
724/// # }
725/// ```
726///
727/// ## Manual Tool Execution
728///
729/// ```rust,no_run
730/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
731/// use serde_json::json;
732///
733/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
734/// let calculator = Tool::new(
735/// "calculator",
736/// "Performs arithmetic",
737/// json!({"type": "object"}),
738/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
739/// );
740///
741/// let mut client = Client::new(AgentOptions::builder()
742/// .model("gpt-4")
743/// .api_key("sk-...")
744/// .tools(vec![calculator])
745/// .build()?)?;
746///
747/// client.send("What's 2+2?").await?;
748///
749/// while let Some(block) = client.receive().await? {
750/// match block {
751/// ContentBlock::ToolUse(tool_use) => {
752/// // Execute tool manually
753/// let result = json!({"result": 4});
754/// client.add_tool_result(tool_use.id(), result)?;
755///
756/// // Continue conversation to get model's response
757/// client.send("").await?;
758/// }
759/// ContentBlock::Text(text) => {
760/// println!("{}", text.text); // "The result is 4."
761/// }
762/// ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
763/// }
764/// }
765/// # Ok(())
766/// # }
767/// ```
768///
769/// ## Automatic Tool Execution
770///
771/// ```rust,no_run
772/// use open_agent::{Client, AgentOptions, ContentBlock, Tool};
773/// use serde_json::json;
774///
775/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
776/// let calculator = Tool::new(
777/// "calculator",
778/// "Performs arithmetic",
779/// json!({"type": "object"}),
780/// |input| Box::pin(async move { Ok(json!({"result": 42})) })
781/// );
782///
783/// let mut client = Client::new(AgentOptions::builder()
784/// .model("gpt-4")
785/// .api_key("sk-...")
786/// .tools(vec![calculator])
787/// .auto_execute_tools(true) // Enable auto-execution
788/// .build()?)?;
789///
790/// client.send("What's 2+2?").await?;
791///
792/// // Tools execute automatically - you only receive final text
793/// while let Some(block) = client.receive().await? {
794/// if let ContentBlock::Text(text) = block {
795/// println!("{}", text.text); // "The result is 4."
796/// }
797/// }
798/// # Ok(())
799/// # }
800/// ```
801///
802/// ## With Interruption
803///
804/// ```rust,no_run
805/// use open_agent::{Client, AgentOptions};
806/// use std::time::Duration;
807///
808/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
809/// let mut client = Client::new(AgentOptions::default())?;
810///
811/// // Start a long-running query
812/// client.send("Write a very long story").await?;
813///
814/// // Spawn a task to interrupt after timeout
815/// let interrupt_handle = client.interrupt_handle();
816/// tokio::spawn(async move {
817/// tokio::time::sleep(Duration::from_secs(5)).await;
818/// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
819/// });
820///
821/// // This loop will stop when interrupted
822/// while let Some(block) = client.receive().await? {
823/// // Process blocks...
824/// }
825///
826/// // Client is still usable after interruption
827/// client.send("What's 2+2?").await?;
828/// # Ok(())
829/// # }
830/// ```
831pub struct Client {
832 /// Configuration options including model, API key, tools, hooks, etc.
833 ///
834 /// This field contains all the settings that control how the client behaves.
835 /// It's set once during construction and cannot be modified (though you can
836 /// access it via `options()` for inspection).
837 options: AgentOptions,
838
839 /// Complete conversation history as a sequence of messages.
840 ///
841 /// Each message contains a role (System/User/Assistant/Tool) and content blocks.
842 /// History grows unbounded by default - use `clear_history()` to reset.
843 ///
844 /// **Important**: The history includes ALL messages, not just user/assistant.
845 /// This includes tool results and intermediate assistant messages from tool calls.
846 history: Vec<Message>,
847
848 /// Currently active SSE stream being consumed.
849 ///
850 /// This is `Some(stream)` while a response is being received, and `None` when
851 /// no request is in flight or after a response completes.
852 ///
853 /// The stream is set by `send()` and consumed by `receive()`. When the stream
854 /// is exhausted, `receive()` returns `Ok(None)` and sets this back to `None`.
855 current_stream: Option<ContentStream>,
856
857 /// Reusable HTTP client for making API requests.
858 ///
859 /// Configured once during construction with the timeout from `AgentOptions`.
860 /// Reusing the same client across requests enables connection pooling and
861 /// better performance for multi-turn conversations.
862 http_client: reqwest::Client,
863
864 /// Thread-safe interrupt flag for cancellation.
865 ///
866 /// This `Arc<AtomicBool>` can be cloned and shared across threads or async tasks
867 /// to signal cancellation. When set to `true`, the next `receive()` call will
868 /// return `Ok(None)` and clear the current stream.
869 ///
870 /// The flag is automatically reset to `false` at the start of each `send()` call.
871 ///
872 /// **Thread Safety**: Can be safely accessed from multiple threads using atomic
873 /// operations. However, only one thread should call `send()`/`receive()`.
874 interrupted: Arc<AtomicBool>,
875
876 /// Buffer of content blocks for auto-execution mode.
877 ///
878 /// When `auto_execute_tools` is enabled, `receive()` internally calls the
879 /// auto-execution loop which buffers all final text blocks here. Subsequent
880 /// calls to `receive()` return blocks from this buffer one at a time.
881 ///
882 /// **Only used when `options.auto_execute_tools == true`**.
883 ///
884 /// The buffer is cleared when starting a new auto-execution loop.
885 auto_exec_buffer: Vec<ContentBlock>,
886
887 /// Current read position in the auto-execution buffer.
888 ///
889 /// Tracks which block to return next when `receive()` is called in auto mode.
890 /// Reset to 0 when the buffer is refilled with a new response.
891 ///
892 /// **Only used when `options.auto_execute_tools == true`**.
893 auto_exec_index: usize,
894}
895
896impl Client {
897 /// Creates a new client with the specified configuration.
898 ///
899 /// This constructor initializes all state fields and creates a reusable HTTP client
900 /// configured with the timeout from `AgentOptions`.
901 ///
902 /// # Parameters
903 ///
904 /// - `options`: Configuration including model, API key, tools, hooks, etc.
905 ///
906 /// # Errors
907 ///
908 /// Returns an error if the HTTP client cannot be built. This can happen due to:
909 /// - Invalid TLS configuration
910 /// - System resource exhaustion
911 /// - Invalid timeout values
912 ///
913 /// # Examples
914 ///
915 /// ```rust
916 /// use open_agent::{Client, AgentOptions};
917 ///
918 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
919 /// let client = Client::new(AgentOptions::builder()
920 /// .model("gpt-4")
921 /// .base_url("http://localhost:1234/v1")
922 /// .build()?)?;
923 /// # Ok(())
924 /// # }
925 /// ```
926 pub fn new(options: AgentOptions) -> Result<Self> {
927 // Build HTTP client with configured timeout
928 // This client is reused across all requests for connection pooling
929 let http_client = reqwest::Client::builder()
930 .timeout(Duration::from_secs(options.timeout()))
931 .build()
932 .map_err(|e| Error::config(format!("Failed to build HTTP client: {}", e)))?;
933
934 Ok(Self {
935 options,
936 history: Vec::new(), // Empty conversation history
937 current_stream: None, // No active stream yet
938 http_client,
939 interrupted: Arc::new(AtomicBool::new(false)), // Not interrupted initially
940 auto_exec_buffer: Vec::new(), // Empty buffer for auto mode
941 auto_exec_index: 0, // Start at beginning of buffer
942 })
943 }
944
945 /// Sends a user message and initiates streaming of the model's response.
946 ///
947 /// This method performs several critical steps:
948 ///
949 /// 1. Executes UserPromptSubmit hooks (which can modify or block the prompt)
950 /// 2. Adds the user message to conversation history
951 /// 3. Builds and sends HTTP request to the OpenAI-compatible API
952 /// 4. Parses the SSE stream and sets up aggregation
953 /// 5. Stores the stream for consumption via `receive()`
954 ///
955 /// # Parameters
956 ///
957 /// - `prompt`: The user's message. Can be empty to continue conversation after
958 /// adding tool results (common pattern in manual tool execution mode).
959 ///
960 /// # Returns
961 ///
962 /// - `Ok(())`: Request sent successfully, call `receive()` to get blocks
963 /// - `Err(e)`: Request failed (network error, API error, hook blocked, etc.)
964 ///
965 /// # Behavior Details
966 ///
967 /// ## Hook Execution
968 ///
969 /// Before sending, UserPromptSubmit hooks are executed. Hooks can:
970 /// - Modify the prompt text
971 /// - Block the request entirely
972 /// - Access conversation history
973 ///
974 /// If a hook blocks the request, this method returns an error immediately.
975 ///
976 /// ## History Management
977 ///
978 /// The prompt is added to history BEFORE sending the request. This ensures
979 /// that history is consistent even if the request fails.
980 ///
981 /// ## Stream Setup
982 ///
983 /// The response stream is set up but not consumed. You must call `receive()`
984 /// repeatedly to get content blocks. The stream remains active until:
985 /// - All blocks are consumed (stream naturally ends)
986 /// - An error occurs
987 /// - Interrupt is triggered
988 ///
989 /// ## Interrupt Handling
990 ///
991 /// The interrupt flag is reset to `false` at the start of this method,
992 /// allowing a fresh request after a previous interruption.
993 ///
994 /// # State Changes
995 ///
996 /// - Resets `interrupted` flag to `false`
997 /// - Appends user message to `history`
998 /// - Sets `current_stream` to new SSE stream
999 /// - Does NOT modify `auto_exec_buffer` or `auto_exec_index`
1000 ///
1001 /// # Errors
1002 ///
1003 /// Returns errors for:
1004 /// - Hook blocking the prompt
1005 /// - HTTP client errors (network failure, DNS, etc.)
1006 /// - API errors (auth failure, invalid model, rate limits)
1007 /// - Invalid response format
1008 ///
1009 /// After an error, the client remains usable for new requests.
1010 ///
1011 /// # Examples
1012 ///
1013 /// ## Basic Usage
1014 ///
1015 /// ```rust,no_run
1016 /// # use open_agent::{Client, AgentOptions};
1017 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1018 /// # let mut client = Client::new(AgentOptions::default())?;
1019 /// client.send("Hello!").await?;
1020 ///
1021 /// while let Some(block) = client.receive().await? {
1022 /// // Process blocks...
1023 /// }
1024 /// # Ok(())
1025 /// # }
1026 /// ```
1027 ///
1028 /// ## Continuing After Tool Result
1029 ///
1030 /// ```rust,no_run
1031 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1032 /// # use serde_json::json;
1033 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1034 /// # let mut client = Client::new(AgentOptions::default())?;
1035 /// client.send("Use the calculator").await?;
1036 ///
1037 /// while let Some(block) = client.receive().await? {
1038 /// if let ContentBlock::ToolUse(tool_use) = block {
1039 /// // Execute tool and add result
1040 /// client.add_tool_result(tool_use.id(), json!({"result": 42}))?;
1041 ///
1042 /// // Continue conversation with empty prompt
1043 /// client.send("").await?;
1044 /// }
1045 /// }
1046 /// # Ok(())
1047 /// # }
1048 /// ```
1049 pub async fn send(&mut self, prompt: &str) -> Result<()> {
1050 use crate::hooks::UserPromptSubmitEvent;
1051
1052 // Reset interrupt flag for new query
1053 // This allows the client to be reused after a previous interruption
1054 // Uses SeqCst ordering to ensure visibility across all threads
1055 self.interrupted.store(false, Ordering::SeqCst);
1056
1057 // Execute UserPromptSubmit hooks
1058 // Hooks run BEFORE adding to history, allowing modification or blocking
1059 let mut final_prompt = prompt.to_string();
1060 let history_snapshot: Vec<serde_json::Value> = self
1061 .history
1062 .iter()
1063 .map(|_| serde_json::json!({})) // Simplified snapshot for hooks
1064 .collect();
1065
1066 // Create hook event with current prompt and history
1067 let event = UserPromptSubmitEvent::new(final_prompt.clone(), history_snapshot);
1068
1069 // Execute all registered UserPromptSubmit hooks
1070 if let Some(decision) = self.options.hooks().execute_user_prompt_submit(event).await {
1071 // Check if hook wants to block execution
1072 if !decision.continue_execution() {
1073 return Err(Error::other(format!(
1074 "Prompt blocked by hook: {}",
1075 decision.reason().unwrap_or("")
1076 )));
1077 }
1078 // Apply any prompt modifications from hooks
1079 if let Some(modified) = decision.modified_prompt() {
1080 final_prompt = modified.to_string();
1081 }
1082 }
1083
1084 // Add user message to history BEFORE sending request
1085 // This ensures history consistency even if request fails
1086 // Empty prompts are still added (needed for tool continuation)
1087 self.history.push(Message::user(final_prompt));
1088
1089 // Build messages array for API request
1090 // This includes system prompt + full conversation history
1091 let mut messages = Vec::new();
1092
1093 // Add system prompt as first message if configured
1094 // System prompts are added fresh for each request (not from history)
1095 if !self.options.system_prompt().is_empty() {
1096 messages.push(OpenAIMessage {
1097 role: "system".to_string(),
1098 content: Some(OpenAIContent::Text(
1099 self.options.system_prompt().to_string(),
1100 )),
1101 tool_calls: None,
1102 tool_call_id: None,
1103 });
1104 }
1105
1106 // Convert conversation history to OpenAI message format
1107 // This includes user prompts, assistant responses, and tool results
1108 for msg in &self.history {
1109 // Separate blocks by type to determine message structure
1110 let mut text_blocks = Vec::new();
1111 let mut image_blocks = Vec::new();
1112 let mut tool_use_blocks = Vec::new();
1113 let mut tool_result_blocks = Vec::new();
1114
1115 for block in &msg.content {
1116 match block {
1117 ContentBlock::Text(text) => text_blocks.push(text),
1118 ContentBlock::Image(image) => image_blocks.push(image),
1119 ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
1120 ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
1121 }
1122 }
1123
1124 // Handle different message types based on content blocks
1125 // Case 1: Message contains tool results (should be separate tool messages)
1126 if !tool_result_blocks.is_empty() {
1127 for tool_result in tool_result_blocks {
1128 // Serialize the tool result content as JSON string
1129 let content =
1130 serde_json::to_string(tool_result.content()).unwrap_or_else(|e| {
1131 format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
1132 });
1133
1134 messages.push(OpenAIMessage {
1135 role: "tool".to_string(),
1136 content: Some(OpenAIContent::Text(content)),
1137 tool_calls: None,
1138 tool_call_id: Some(tool_result.tool_use_id().to_string()),
1139 });
1140 }
1141 }
1142 // Case 2: Message contains tool use blocks (assistant with tool calls)
1143 else if !tool_use_blocks.is_empty() {
1144 // Build tool_calls array
1145 let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
1146 .iter()
1147 .map(|tool_use| {
1148 // Serialize the input as a JSON string (OpenAI API requirement)
1149 let arguments = serde_json::to_string(tool_use.input())
1150 .unwrap_or_else(|_| "{}".to_string());
1151
1152 OpenAIToolCall {
1153 id: tool_use.id().to_string(),
1154 call_type: "function".to_string(),
1155 function: OpenAIFunction {
1156 name: tool_use.name().to_string(),
1157 arguments,
1158 },
1159 }
1160 })
1161 .collect();
1162
1163 // Extract any text content (some models include reasoning before tool calls)
1164 // Note: OpenAI API requires content field even if empty when tool_calls present
1165 let content = if !text_blocks.is_empty() {
1166 let text = text_blocks
1167 .iter()
1168 .map(|t| t.text.as_str())
1169 .collect::<Vec<_>>()
1170 .join("\n");
1171 Some(OpenAIContent::Text(text))
1172 } else {
1173 // Empty string satisfies OpenAI API schema (content is required)
1174 Some(OpenAIContent::Text(String::new()))
1175 };
1176
1177 messages.push(OpenAIMessage {
1178 role: "assistant".to_string(),
1179 content,
1180 tool_calls: Some(tool_calls),
1181 tool_call_id: None,
1182 });
1183 }
1184 // Case 3: Message contains images (use OpenAIContent::Parts)
1185 else if !image_blocks.is_empty() {
1186 // Log debug info about images being serialized
1187 log::debug!(
1188 "Serializing message with {} image(s) for {:?} role",
1189 image_blocks.len(),
1190 msg.role
1191 );
1192
1193 // Build content parts array preserving original order
1194 let mut content_parts = Vec::new();
1195
1196 // Re-iterate through content blocks to maintain order
1197 for block in &msg.content {
1198 match block {
1199 ContentBlock::Text(text) => {
1200 content_parts.push(OpenAIContentPart::text(&text.text));
1201 }
1202 ContentBlock::Image(image) => {
1203 // Log image details (truncate URL for privacy)
1204 let url_display = if image.url().len() > 100 {
1205 format!("{}... ({} chars)", &image.url()[..100], image.url().len())
1206 } else {
1207 image.url().to_string()
1208 };
1209 let detail_str = match image.detail() {
1210 crate::types::ImageDetail::Low => "low",
1211 crate::types::ImageDetail::High => "high",
1212 crate::types::ImageDetail::Auto => "auto",
1213 };
1214 log::debug!(" - Image: {} (detail: {})", url_display, detail_str);
1215
1216 content_parts.push(OpenAIContentPart::from_image(image));
1217 }
1218 ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
1219 }
1220 }
1221
1222 // Defensive check: content_parts should never be empty at this point
1223 // If it is, it indicates a logic error (e.g., all blocks were filtered out)
1224 if content_parts.is_empty() {
1225 return Err(Error::other(
1226 "Internal error: Message with images produced empty content array",
1227 ));
1228 }
1229
1230 let role_str = match msg.role {
1231 MessageRole::System => "system",
1232 MessageRole::User => "user",
1233 MessageRole::Assistant => "assistant",
1234 MessageRole::Tool => "tool",
1235 };
1236
1237 messages.push(OpenAIMessage {
1238 role: role_str.to_string(),
1239 content: Some(OpenAIContent::Parts(content_parts)),
1240 tool_calls: None,
1241 tool_call_id: None,
1242 });
1243 }
1244 // Case 4: Message contains only text (normal message, backward compatible)
1245 else {
1246 let content = text_blocks
1247 .iter()
1248 .map(|t| t.text.as_str())
1249 .collect::<Vec<_>>()
1250 .join("\n");
1251
1252 let role_str = match msg.role {
1253 MessageRole::System => "system",
1254 MessageRole::User => "user",
1255 MessageRole::Assistant => "assistant",
1256 MessageRole::Tool => "tool",
1257 };
1258
1259 messages.push(OpenAIMessage {
1260 role: role_str.to_string(),
1261 content: Some(OpenAIContent::Text(content)),
1262 tool_calls: None,
1263 tool_call_id: None,
1264 });
1265 }
1266 }
1267
1268 // Convert tools to OpenAI format if any are registered
1269 // Each tool is described with name, description, and JSON Schema parameters
1270 let tools = if !self.options.tools().is_empty() {
1271 Some(
1272 self.options
1273 .tools()
1274 .iter()
1275 .map(|t| t.to_openai_format())
1276 .collect(),
1277 )
1278 } else {
1279 None
1280 };
1281
1282 // Build the OpenAI-compatible request payload
1283 let request = OpenAIRequest {
1284 model: self.options.model().to_string(),
1285 messages,
1286 stream: true, // Always stream for progressive rendering
1287 max_tokens: self.options.max_tokens(),
1288 temperature: Some(self.options.temperature()),
1289 tools,
1290 };
1291
1292 // Make HTTP POST request to chat completions endpoint
1293 let url = format!("{}/chat/completions", self.options.base_url());
1294 let response = self
1295 .http_client
1296 .post(&url)
1297 .header(
1298 "Authorization",
1299 format!("Bearer {}", self.options.api_key()),
1300 )
1301 .header("Content-Type", "application/json")
1302 .json(&request)
1303 .send()
1304 .await
1305 .map_err(Error::Http)?;
1306
1307 // Check for HTTP-level errors before processing stream
1308 // This catches authentication, rate limits, invalid models, etc.
1309 if !response.status().is_success() {
1310 let status = response.status();
1311 let body = response.text().await.unwrap_or_else(|e| {
1312 eprintln!("WARNING: Failed to read error response body: {}", e);
1313 "Unknown error (failed to read response body)".to_string()
1314 });
1315 return Err(Error::api(format!("API error {}: {}", status, body)));
1316 }
1317
1318 // Parse Server-Sent Events (SSE) stream from response
1319 let sse_stream = parse_sse_stream(response);
1320
1321 // Aggregate SSE chunks into complete content blocks
1322 // ToolCallAggregator maintains state to handle incremental JSON chunks
1323 // that may arrive split across multiple SSE events
1324 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
1325 let result = match chunk_result {
1326 Ok(chunk) => match aggregator.process_chunk(chunk) {
1327 Ok(blocks) => {
1328 if blocks.is_empty() {
1329 Some(None) // Partial chunk, keep aggregating
1330 } else {
1331 Some(Some(Ok(blocks))) // Complete block(s) ready
1332 }
1333 }
1334 Err(e) => Some(Some(Err(e))), // Processing error
1335 },
1336 Err(e) => Some(Some(Err(e))), // Stream error
1337 };
1338 futures::future::ready(result)
1339 });
1340
1341 // Flatten the stream to emit individual blocks
1342 // filter_map removes None values (partial chunks)
1343 // flat_map converts Vec<ContentBlock> to individual items
1344 let flattened = stream
1345 .filter_map(|item| async move { item })
1346 .flat_map(|result| {
1347 futures::stream::iter(match result {
1348 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
1349 Err(e) => vec![Err(e)],
1350 })
1351 });
1352
1353 // Store the stream for consumption via receive()
1354 // The stream is NOT consumed here - that happens in receive()
1355 self.current_stream = Some(Box::pin(flattened));
1356
1357 Ok(())
1358 }
1359
1360 /// Internal method that returns one block from the current stream.
1361 ///
1362 /// This is the core streaming logic extracted for reuse by both manual mode
1363 /// and auto-execution mode. It handles interrupt checking and stream consumption.
1364 ///
1365 /// # Returns
1366 ///
1367 /// - `Ok(Some(block))`: Successfully received a content block
1368 /// - `Ok(None)`: Stream ended naturally or was interrupted
1369 /// - `Err(e)`: An error occurred during streaming
1370 ///
1371 /// # State Changes
1372 ///
1373 /// - Sets `current_stream` to `None` if interrupted or stream ends
1374 /// - Does not modify history or other state
1375 ///
1376 /// # Implementation Notes
1377 ///
1378 /// This method checks the interrupt flag on every call, allowing responsive
1379 /// cancellation. The check uses SeqCst ordering for immediate visibility of
1380 /// interrupts from other threads.
1381 async fn receive_one(&mut self) -> Result<Option<ContentBlock>> {
1382 // Check interrupt flag before attempting to receive
1383 // Uses SeqCst to ensure we see the latest value from any thread
1384 if self.interrupted.load(Ordering::SeqCst) {
1385 // Clear the stream and return None to signal completion
1386 self.current_stream = None;
1387 return Ok(None);
1388 }
1389
1390 // Poll the current stream if one exists
1391 if let Some(stream) = &mut self.current_stream {
1392 match stream.next().await {
1393 Some(Ok(block)) => Ok(Some(block)), // Got a block
1394 Some(Err(e)) => Err(e), // Stream error
1395 None => Ok(None), // Stream ended
1396 }
1397 } else {
1398 // No active stream
1399 Ok(None)
1400 }
1401 }
1402
1403 /// Collects all blocks from the current stream into a vector.
1404 ///
1405 /// Internal helper for auto-execution mode. This method buffers the entire
1406 /// response in memory, which is necessary to determine if the response contains
1407 /// tool calls before returning anything to the caller.
1408 ///
1409 /// # Returns
1410 ///
1411 /// - `Ok(vec)`: Successfully collected all blocks
1412 /// - `Err(e)`: Error during collection or interrupted
1413 ///
1414 /// # Memory Usage
1415 ///
1416 /// This buffers the entire response, which can be large for long completions.
1417 /// Consider the memory implications when using auto-execution mode.
1418 ///
1419 /// # Interruption
1420 ///
1421 /// Checks interrupt flag during collection and returns error if interrupted.
1422 async fn collect_all_blocks(&mut self) -> Result<Vec<ContentBlock>> {
1423 let mut blocks = Vec::new();
1424
1425 // Consume entire stream into vector
1426 while let Some(block) = self.receive_one().await? {
1427 // Check interrupt during collection for responsiveness
1428 if self.interrupted.load(Ordering::SeqCst) {
1429 self.current_stream = None;
1430 return Err(Error::other(
1431 "Operation interrupted during block collection",
1432 ));
1433 }
1434
1435 blocks.push(block);
1436 }
1437
1438 Ok(blocks)
1439 }
1440
1441 /// Executes a tool by name with the given input.
1442 ///
1443 /// Internal helper for auto-execution mode. Looks up the tool in the registered
1444 /// tools list and executes it with the provided input.
1445 ///
1446 /// # Parameters
1447 ///
1448 /// - `tool_name`: Name of the tool to execute
1449 /// - `input`: JSON value containing tool parameters
1450 ///
1451 /// # Returns
1452 ///
1453 /// - `Ok(result)`: Tool executed successfully, returns result as JSON
1454 /// - `Err(e)`: Tool not found or execution failed
1455 ///
1456 /// # Error Handling
1457 ///
1458 /// If the tool is not found in the registry, returns a ToolError.
1459 /// If execution fails, the error from the tool is propagated.
1460 async fn execute_tool_internal(
1461 &self,
1462 tool_name: &str,
1463 input: serde_json::Value,
1464 ) -> Result<serde_json::Value> {
1465 // Find tool in registered tools by name
1466 let tool = self
1467 .options
1468 .tools()
1469 .iter()
1470 .find(|t| t.name() == tool_name)
1471 .ok_or_else(|| Error::tool(format!("Tool '{}' not found", tool_name)))?;
1472
1473 // Execute the tool's async function
1474 tool.execute(input).await
1475 }
1476
1477 /// Auto-execution loop that handles tool calls automatically.
1478 ///
1479 /// This is the core implementation of automatic tool execution mode. It:
1480 ///
1481 /// 1. Collects all blocks from the current stream
1482 /// 2. Separates text blocks from tool use blocks
1483 /// 3. If there are tool blocks:
1484 /// - Executes PreToolUse hooks (can modify/block)
1485 /// - Executes each tool via its registered function
1486 /// - Executes PostToolUse hooks (can modify result)
1487 /// - Adds results to history
1488 /// - Continues conversation with send("")
1489 /// 4. Repeats until text-only response or max iterations
1490 /// 5. Returns all final text blocks
1491 ///
1492 /// # Returns
1493 ///
1494 /// - `Ok(blocks)`: Final text blocks after all tool iterations
1495 /// - `Err(e)`: Error during execution, stream processing, or interruption
1496 ///
1497 /// # Iteration Limit
1498 ///
1499 /// The loop is bounded by `options.max_tool_iterations` to prevent infinite loops.
1500 /// When the limit is reached, the loop stops and returns whatever text blocks
1501 /// have been collected so far.
1502 ///
1503 /// # Hook Integration
1504 ///
1505 /// Hooks are executed for each tool call:
1506 /// - **PreToolUse**: Can modify input or block execution entirely
1507 /// - **PostToolUse**: Can modify the result before it's added to history
1508 ///
1509 /// If a hook blocks execution, a JSON error response is used as the tool result.
1510 ///
1511 /// # State Management
1512 ///
1513 /// The loop maintains history by adding:
1514 /// - Assistant messages with text + tool use blocks
1515 /// - User messages with tool result blocks
1516 ///
1517 /// This creates a proper conversation flow that the model can follow.
1518 ///
1519 /// # Error Recovery
1520 ///
1521 /// If a tool execution fails, the error is converted to a JSON error response
1522 /// and added as the tool result. This allows the conversation to continue
1523 /// and lets the model handle the error.
1524 async fn auto_execute_loop(&mut self) -> Result<Vec<ContentBlock>> {
1525 use crate::types::ToolResultBlock;
1526
1527 // Track iterations to prevent infinite loops
1528 let mut iteration = 0;
1529 let max_iterations = self.options.max_tool_iterations();
1530
1531 loop {
1532 // ========================================================================
1533 // STEP 1: Collect all blocks from current stream
1534 // ========================================================================
1535 // Buffer the entire response to determine if it contains tool calls
1536 let blocks = self.collect_all_blocks().await?;
1537
1538 // Empty response means stream ended or was interrupted
1539 if blocks.is_empty() {
1540 return Ok(Vec::new());
1541 }
1542
1543 // ========================================================================
1544 // STEP 2: Separate text blocks from tool use blocks
1545 // ========================================================================
1546 // The model can return a mix of text and tool calls in one response
1547 let mut text_blocks = Vec::new();
1548 let mut tool_blocks = Vec::new();
1549
1550 for block in blocks {
1551 match block {
1552 ContentBlock::Text(_) => text_blocks.push(block),
1553 ContentBlock::ToolUse(_) => tool_blocks.push(block),
1554 ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {} // Ignore ToolResult and Image variants
1555 }
1556 }
1557
1558 // ========================================================================
1559 // STEP 3: Check if we're done (no tool calls)
1560 // ========================================================================
1561 // If the response contains no tool calls, we've reached the final answer
1562 if tool_blocks.is_empty() {
1563 // Add assistant's final text response to history
1564 if !text_blocks.is_empty() {
1565 let assistant_msg = Message::assistant(text_blocks.clone());
1566 self.history.push(assistant_msg);
1567 }
1568 // Return text blocks to caller via buffered receive()
1569 return Ok(text_blocks);
1570 }
1571
1572 // ========================================================================
1573 // STEP 4: Check iteration limit BEFORE executing tools
1574 // ========================================================================
1575 // Increment counter and check if we've hit the max
1576 iteration += 1;
1577 if iteration > max_iterations {
1578 // Max iterations reached - stop execution and return what we have
1579 // This prevents infinite tool-calling loops
1580 if !text_blocks.is_empty() {
1581 let assistant_msg = Message::assistant(text_blocks.clone());
1582 self.history.push(assistant_msg);
1583 }
1584 return Ok(text_blocks);
1585 }
1586
1587 // ========================================================================
1588 // STEP 5: Add assistant message to history
1589 // ========================================================================
1590 // The assistant message includes BOTH text and tool use blocks
1591 // This preserves the full context for future turns
1592 let mut all_blocks = text_blocks.clone();
1593 all_blocks.extend(tool_blocks.clone());
1594 let assistant_msg = Message::assistant(all_blocks);
1595 self.history.push(assistant_msg);
1596
1597 // ========================================================================
1598 // STEP 6: Execute all tools and collect results
1599 // ========================================================================
1600 for block in tool_blocks {
1601 if let ContentBlock::ToolUse(tool_use) = block {
1602 // Create simplified history snapshot for hooks
1603 // TODO: Full serialization of history for hooks
1604 let history_snapshot: Vec<serde_json::Value> =
1605 self.history.iter().map(|_| serde_json::json!({})).collect();
1606
1607 // ============================================================
1608 // Execute PreToolUse hooks
1609 // ============================================================
1610 use crate::hooks::PreToolUseEvent;
1611 let pre_event = PreToolUseEvent::new(
1612 tool_use.name().to_string(),
1613 tool_use.input().clone(),
1614 tool_use.id().to_string(),
1615 history_snapshot.clone(),
1616 );
1617
1618 // Track whether to execute and what input to use
1619 let mut tool_input = tool_use.input().clone();
1620 let mut should_execute = true;
1621 let mut block_reason = None;
1622
1623 // Execute all PreToolUse hooks
1624 if let Some(decision) =
1625 self.options.hooks().execute_pre_tool_use(pre_event).await
1626 {
1627 if !decision.continue_execution() {
1628 // Hook blocked execution
1629 should_execute = false;
1630 block_reason = decision.reason().map(|s| s.to_string());
1631 } else if let Some(modified) = decision.modified_input() {
1632 // Hook modified the input
1633 tool_input = modified.clone();
1634 }
1635 }
1636
1637 // ============================================================
1638 // Execute tool (or create error result if blocked)
1639 // ============================================================
1640 let result = if should_execute {
1641 // Actually execute the tool
1642 match self
1643 .execute_tool_internal(tool_use.name(), tool_input.clone())
1644 .await
1645 {
1646 Ok(res) => res, // Success - use the result
1647 Err(e) => {
1648 // Tool execution failed - convert to JSON error
1649 // This allows the conversation to continue
1650 serde_json::json!({
1651 "error": e.to_string(),
1652 "tool": tool_use.name(),
1653 "id": tool_use.id()
1654 })
1655 }
1656 }
1657 } else {
1658 // Tool blocked by PreToolUse hook - create error result
1659 serde_json::json!({
1660 "error": "Tool execution blocked by hook",
1661 "reason": block_reason.unwrap_or_else(|| "No reason provided".to_string()),
1662 "tool": tool_use.name(),
1663 "id": tool_use.id()
1664 })
1665 };
1666
1667 // ============================================================
1668 // Execute PostToolUse hooks
1669 // ============================================================
1670 use crate::hooks::PostToolUseEvent;
1671 let post_event = PostToolUseEvent::new(
1672 tool_use.name().to_string(),
1673 tool_input,
1674 tool_use.id().to_string(),
1675 result.clone(),
1676 history_snapshot,
1677 );
1678
1679 let mut final_result = result;
1680 if let Some(decision) =
1681 self.options.hooks().execute_post_tool_use(post_event).await
1682 {
1683 // PostToolUse can modify the result
1684 // Note: Uses modified_input field (naming is historical)
1685 if let Some(modified) = decision.modified_input() {
1686 final_result = modified.clone();
1687 }
1688 }
1689
1690 // ============================================================
1691 // Add tool result to history
1692 // ============================================================
1693 // Tool results are added as user messages (per OpenAI convention)
1694 let tool_result = ToolResultBlock::new(tool_use.id(), final_result);
1695 let tool_result_msg =
1696 Message::user_with_blocks(vec![ContentBlock::ToolResult(tool_result)]);
1697 self.history.push(tool_result_msg);
1698 }
1699 }
1700
1701 // ========================================================================
1702 // STEP 7: Continue conversation to get next response
1703 // ========================================================================
1704 // Send empty string to continue - the history contains all context
1705 self.send("").await?;
1706
1707 // Loop continues to collect and process the next response
1708 // This will either be more tool calls or the final text answer
1709 }
1710 }
1711
1712 /// Receives the next content block from the current stream.
1713 ///
1714 /// This is the primary method for consuming responses from the model. It works
1715 /// differently depending on the operating mode:
1716 ///
1717 /// ## Manual Mode (default)
1718 ///
1719 /// Streams blocks directly from the API response as they arrive. You receive:
1720 /// - `TextBlock`: Incremental text from the model
1721 /// - `ToolUseBlock`: Requests to execute tools
1722 /// - Other block types as they're emitted
1723 ///
1724 /// When you receive a `ToolUseBlock`, you must:
1725 /// 1. Execute the tool yourself
1726 /// 2. Call `add_tool_result()` with the result
1727 /// 3. Call `send("")` to continue the conversation
1728 ///
1729 /// ## Automatic Mode (`auto_execute_tools = true`)
1730 ///
1731 /// Transparently executes tools and only returns final text blocks. The first
1732 /// call to `receive()` triggers the auto-execution loop which:
1733 /// 1. Collects all blocks from the stream
1734 /// 2. Executes any tool calls automatically
1735 /// 3. Continues the conversation until reaching a text-only response
1736 /// 4. Buffers the final text blocks
1737 /// 5. Returns them one at a time on subsequent `receive()` calls
1738 ///
1739 /// # Returns
1740 ///
1741 /// - `Ok(Some(block))`: Successfully received a content block
1742 /// - `Ok(None)`: Stream ended normally or was interrupted
1743 /// - `Err(e)`: An error occurred during streaming or tool execution
1744 ///
1745 /// # Behavior Details
1746 ///
1747 /// ## Interruption
1748 ///
1749 /// Checks the interrupt flag on every call. If interrupted, immediately returns
1750 /// `Ok(None)` and clears the stream. The client can be reused after interruption.
1751 ///
1752 /// ## Stream Lifecycle
1753 ///
1754 /// 1. After `send()`, stream is active
1755 /// 2. Each `receive()` call yields one block
1756 /// 3. When stream ends, returns `Ok(None)`
1757 /// 4. Subsequent calls continue returning `Ok(None)` until next `send()`
1758 ///
1759 /// ## Auto-Execution Buffer
1760 ///
1761 /// In auto mode, blocks are buffered in memory. The buffer persists until
1762 /// fully consumed (index reaches length), at which point it's cleared.
1763 ///
1764 /// # State Changes
1765 ///
1766 /// - Advances stream position
1767 /// - In auto mode: May trigger entire execution loop and modify history
1768 /// - In manual mode: Only reads from stream, no history changes
1769 /// - Increments `auto_exec_index` when returning buffered blocks
1770 ///
1771 /// # Examples
1772 ///
1773 /// ## Manual Mode - Basic
1774 ///
1775 /// ```rust,no_run
1776 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1777 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1778 /// # let mut client = Client::new(AgentOptions::default())?;
1779 /// client.send("Hello!").await?;
1780 ///
1781 /// while let Some(block) = client.receive().await? {
1782 /// match block {
1783 /// ContentBlock::Text(text) => print!("{}", text.text),
1784 /// ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
1785 /// }
1786 /// }
1787 /// # Ok(())
1788 /// # }
1789 /// ```
1790 ///
1791 /// ## Manual Mode - With Tools
1792 ///
1793 /// ```rust,no_run
1794 /// # use open_agent::{Client, AgentOptions, ContentBlock};
1795 /// # use serde_json::json;
1796 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1797 /// # let mut client = Client::new(AgentOptions::default())?;
1798 /// client.send("Use the calculator").await?;
1799 ///
1800 /// while let Some(block) = client.receive().await? {
1801 /// match block {
1802 /// ContentBlock::Text(text) => {
1803 /// println!("{}", text.text);
1804 /// }
1805 /// ContentBlock::ToolUse(tool_use) => {
1806 /// println!("Executing: {}", tool_use.name());
1807 ///
1808 /// // Execute tool manually
1809 /// let result = json!({"result": 42});
1810 ///
1811 /// // Add result and continue
1812 /// client.add_tool_result(tool_use.id(), result)?;
1813 /// client.send("").await?;
1814 /// }
1815 /// ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
1816 /// }
1817 /// }
1818 /// # Ok(())
1819 /// # }
1820 /// ```
1821 ///
1822 /// ## Auto Mode
1823 ///
1824 /// ```rust,no_run
1825 /// # use open_agent::{Client, AgentOptions, ContentBlock, Tool};
1826 /// # use serde_json::json;
1827 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1828 /// let mut client = Client::new(AgentOptions::builder()
1829 /// .auto_execute_tools(true)
1830 /// .build()?)?;
1831 ///
1832 /// client.send("Calculate 2+2").await?;
1833 ///
1834 /// // Tools execute automatically - you only get final text
1835 /// while let Some(block) = client.receive().await? {
1836 /// if let ContentBlock::Text(text) = block {
1837 /// println!("{}", text.text);
1838 /// }
1839 /// }
1840 /// # Ok(())
1841 /// # }
1842 /// ```
1843 ///
1844 /// ## With Error Handling
1845 ///
1846 /// ```rust,no_run
1847 /// # use open_agent::{Client, AgentOptions};
1848 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1849 /// # let mut client = Client::new(AgentOptions::default())?;
1850 /// client.send("Hello").await?;
1851 ///
1852 /// loop {
1853 /// match client.receive().await {
1854 /// Ok(Some(block)) => {
1855 /// // Process block
1856 /// }
1857 /// Ok(None) => {
1858 /// // Stream ended
1859 /// break;
1860 /// }
1861 /// Err(e) => {
1862 /// eprintln!("Error: {}", e);
1863 /// break;
1864 /// }
1865 /// }
1866 /// }
1867 /// # Ok(())
1868 /// # }
1869 /// ```
1870 ///
1871 /// Sends a pre-built message to the AI model.
1872 ///
1873 /// This method allows sending messages with images or custom content blocks
1874 /// that cannot be expressed as simple text prompts. Use the `Message` helper
1875 /// methods like [`user_with_image()`](Message::user_with_image),
1876 /// [`user_with_image_detail()`](Message::user_with_image_detail), or
1877 /// [`user_with_base64_image()`](Message::user_with_base64_image) to create
1878 /// messages with multimodal content.
1879 ///
1880 /// Unlike [`send()`](Client::send), this method:
1881 /// - Accepts pre-built `Message` objects instead of text prompts
1882 /// - Bypasses `UserPromptSubmit` hooks (since message is already constructed)
1883 /// - Enables multimodal interactions (text + images)
1884 ///
1885 /// After calling this method, use [`receive()`](Client::receive) to get the
1886 /// response content blocks.
1887 ///
1888 /// # Arguments
1889 ///
1890 /// * `message` - A pre-built message (typically created with `Message::user_with_image()` or similar helpers)
1891 ///
1892 /// # Errors
1893 ///
1894 /// Returns `Error` if:
1895 /// - Network request fails
1896 /// - Server returns an error
1897 /// - Response cannot be parsed
1898 /// - Request is interrupted via [`interrupt()`](Client::interrupt)
1899 ///
1900 /// # Example
1901 ///
1902 /// ```rust,no_run
1903 /// use open_agent::{Client, AgentOptions, Message, ImageDetail};
1904 ///
1905 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1906 /// let options = AgentOptions::builder()
1907 /// .model("gpt-4-vision-preview")
1908 /// .base_url("http://localhost:1234/v1")
1909 /// .build()?;
1910 ///
1911 /// let mut client = Client::new(options)?;
1912 ///
1913 /// // Send a message with an image
1914 /// let msg = Message::user_with_image(
1915 /// "What's in this image?",
1916 /// "https://example.com/photo.jpg"
1917 /// )?;
1918 /// client.send_message(msg).await?;
1919 ///
1920 /// // Receive the response
1921 /// while let Some(block) = client.receive().await? {
1922 /// // Process response blocks
1923 /// }
1924 /// # Ok(())
1925 /// # }
1926 /// ```
1927 pub async fn send_message(&mut self, message: Message) -> Result<()> {
1928 // Reset interrupt flag for new query
1929 // This allows the client to be reused after a previous interruption
1930 // Uses SeqCst ordering to ensure visibility across all threads
1931 self.interrupted.store(false, Ordering::SeqCst);
1932
1933 // Note: We do NOT run UserPromptSubmit hooks here because:
1934 // 1. The message is already fully constructed
1935 // 2. Hooks expect string prompts, not complex Message objects
1936 // 3. For multimodal messages, there's no single "prompt" to modify
1937
1938 // Add message to history BEFORE sending request
1939 // This ensures history consistency even if request fails
1940 self.history.push(message);
1941
1942 // The rest of the logic is identical to send() - build and execute request
1943 // Build messages array for API request
1944 // This includes system prompt + full conversation history
1945 let mut messages = Vec::new();
1946
1947 // Add system prompt as first message if configured
1948 // System prompts are added fresh for each request (not from history)
1949 if !self.options.system_prompt().is_empty() {
1950 messages.push(OpenAIMessage {
1951 role: "system".to_string(),
1952 content: Some(OpenAIContent::Text(
1953 self.options.system_prompt().to_string(),
1954 )),
1955 tool_calls: None,
1956 tool_call_id: None,
1957 });
1958 }
1959
1960 // Convert conversation history to OpenAI message format
1961 // This includes user prompts, assistant responses, and tool results
1962 for msg in &self.history {
1963 // Separate blocks by type to determine message structure
1964 let mut text_blocks = Vec::new();
1965 let mut image_blocks = Vec::new();
1966 let mut tool_use_blocks = Vec::new();
1967 let mut tool_result_blocks = Vec::new();
1968
1969 for block in &msg.content {
1970 match block {
1971 ContentBlock::Text(text) => text_blocks.push(text),
1972 ContentBlock::Image(image) => image_blocks.push(image),
1973 ContentBlock::ToolUse(tool_use) => tool_use_blocks.push(tool_use),
1974 ContentBlock::ToolResult(tool_result) => tool_result_blocks.push(tool_result),
1975 }
1976 }
1977
1978 // Handle different message types based on content blocks
1979 // Case 1: Message contains tool results (should be separate tool messages)
1980 if !tool_result_blocks.is_empty() {
1981 for tool_result in tool_result_blocks {
1982 // Serialize the tool result content as JSON string
1983 let content =
1984 serde_json::to_string(tool_result.content()).unwrap_or_else(|e| {
1985 format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
1986 });
1987
1988 messages.push(OpenAIMessage {
1989 role: "tool".to_string(),
1990 content: Some(OpenAIContent::Text(content)),
1991 tool_calls: None,
1992 tool_call_id: Some(tool_result.tool_use_id().to_string()),
1993 });
1994 }
1995 }
1996 // Case 2: Message contains tool use blocks (assistant with tool calls)
1997 else if !tool_use_blocks.is_empty() {
1998 // Build tool_calls array
1999 let tool_calls: Vec<OpenAIToolCall> = tool_use_blocks
2000 .iter()
2001 .map(|tool_use| {
2002 // Serialize the input as a JSON string (OpenAI API requirement)
2003 let arguments = serde_json::to_string(tool_use.input())
2004 .unwrap_or_else(|_| "{}".to_string());
2005
2006 OpenAIToolCall {
2007 id: tool_use.id().to_string(),
2008 call_type: "function".to_string(),
2009 function: OpenAIFunction {
2010 name: tool_use.name().to_string(),
2011 arguments,
2012 },
2013 }
2014 })
2015 .collect();
2016
2017 // Extract any text content (some models include reasoning before tool calls)
2018 // Note: OpenAI API requires content field even if empty when tool_calls present
2019 let content = if !text_blocks.is_empty() {
2020 let text = text_blocks
2021 .iter()
2022 .map(|t| t.text.as_str())
2023 .collect::<Vec<_>>()
2024 .join("\n");
2025 Some(OpenAIContent::Text(text))
2026 } else {
2027 // Empty string satisfies OpenAI API schema (content is required)
2028 Some(OpenAIContent::Text(String::new()))
2029 };
2030
2031 messages.push(OpenAIMessage {
2032 role: "assistant".to_string(),
2033 content,
2034 tool_calls: Some(tool_calls),
2035 tool_call_id: None,
2036 });
2037 }
2038 // Case 3: Message contains images (use OpenAIContent::Parts)
2039 else if !image_blocks.is_empty() {
2040 // Log debug info about images being serialized
2041 log::debug!(
2042 "Serializing message with {} image(s) for {:?} role",
2043 image_blocks.len(),
2044 msg.role
2045 );
2046
2047 // Build content parts array preserving original order
2048 let mut content_parts = Vec::new();
2049
2050 // Re-iterate through content blocks to maintain order
2051 for block in &msg.content {
2052 match block {
2053 ContentBlock::Text(text) => {
2054 content_parts.push(OpenAIContentPart::text(&text.text));
2055 }
2056 ContentBlock::Image(image) => {
2057 // Log image details (truncate URL for privacy)
2058 let url_display = if image.url().len() > 100 {
2059 format!("{}... ({} chars)", &image.url()[..100], image.url().len())
2060 } else {
2061 image.url().to_string()
2062 };
2063 let detail_str = match image.detail() {
2064 crate::types::ImageDetail::Low => "low",
2065 crate::types::ImageDetail::High => "high",
2066 crate::types::ImageDetail::Auto => "auto",
2067 };
2068 log::debug!(" - Image: {} (detail: {})", url_display, detail_str);
2069
2070 content_parts.push(OpenAIContentPart::from_image(image));
2071 }
2072 ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
2073 }
2074 }
2075
2076 // Defensive check: content_parts should never be empty at this point
2077 // If it is, it indicates a logic error (e.g., all blocks were filtered out)
2078 if content_parts.is_empty() {
2079 return Err(Error::other(
2080 "Internal error: Message with images produced empty content array",
2081 ));
2082 }
2083
2084 let role_str = match msg.role {
2085 MessageRole::System => "system",
2086 MessageRole::User => "user",
2087 MessageRole::Assistant => "assistant",
2088 MessageRole::Tool => "tool",
2089 };
2090
2091 messages.push(OpenAIMessage {
2092 role: role_str.to_string(),
2093 content: Some(OpenAIContent::Parts(content_parts)),
2094 tool_calls: None,
2095 tool_call_id: None,
2096 });
2097 }
2098 // Case 4: Message contains only text (normal message, backward compatible)
2099 else {
2100 let content = text_blocks
2101 .iter()
2102 .map(|t| t.text.as_str())
2103 .collect::<Vec<_>>()
2104 .join("\n");
2105
2106 let role_str = match msg.role {
2107 MessageRole::System => "system",
2108 MessageRole::User => "user",
2109 MessageRole::Assistant => "assistant",
2110 MessageRole::Tool => "tool",
2111 };
2112
2113 messages.push(OpenAIMessage {
2114 role: role_str.to_string(),
2115 content: Some(OpenAIContent::Text(content)),
2116 tool_calls: None,
2117 tool_call_id: None,
2118 });
2119 }
2120 }
2121
2122 // Convert tools to OpenAI format if any are registered
2123 let tools = if !self.options.tools().is_empty() {
2124 Some(
2125 self.options
2126 .tools()
2127 .iter()
2128 .map(|t| t.to_openai_format())
2129 .collect(),
2130 )
2131 } else {
2132 None
2133 };
2134
2135 // Build the OpenAI-compatible request payload
2136 let request = OpenAIRequest {
2137 model: self.options.model().to_string(),
2138 messages,
2139 stream: true,
2140 max_tokens: self.options.max_tokens(),
2141 temperature: Some(self.options.temperature()),
2142 tools,
2143 };
2144
2145 // Make HTTP POST request to chat completions endpoint
2146 let url = format!("{}/chat/completions", self.options.base_url());
2147 let response = self
2148 .http_client
2149 .post(&url)
2150 .header(
2151 "Authorization",
2152 format!("Bearer {}", self.options.api_key()),
2153 )
2154 .header("Content-Type", "application/json")
2155 .json(&request)
2156 .send()
2157 .await
2158 .map_err(Error::Http)?;
2159
2160 // Check for HTTP-level errors
2161 if !response.status().is_success() {
2162 let status = response.status();
2163 let body = response.text().await.unwrap_or_else(|e| {
2164 eprintln!("WARNING: Failed to read error response body: {}", e);
2165 "Unknown error (failed to read response body)".to_string()
2166 });
2167 return Err(Error::api(format!("API error {}: {}", status, body)));
2168 }
2169
2170 // Parse Server-Sent Events stream
2171 let sse_stream = parse_sse_stream(response);
2172
2173 // Aggregate SSE chunks into complete content blocks
2174 let stream = sse_stream.scan(ToolCallAggregator::new(), |aggregator, chunk_result| {
2175 let result = match chunk_result {
2176 Ok(chunk) => match aggregator.process_chunk(chunk) {
2177 Ok(blocks) => {
2178 if blocks.is_empty() {
2179 Some(None) // Partial chunk
2180 } else {
2181 Some(Some(Ok(blocks))) // Complete blocks
2182 }
2183 }
2184 Err(e) => Some(Some(Err(e))),
2185 },
2186 Err(e) => Some(Some(Err(e))),
2187 };
2188 futures::future::ready(result)
2189 });
2190
2191 // Flatten nested Options and filter out None values
2192 let stream = stream.filter_map(futures::future::ready);
2193
2194 // Flatten Vec<ContentBlock> into individual blocks
2195 let stream = stream.flat_map(|result| {
2196 futures::stream::iter(match result {
2197 Ok(blocks) => blocks.into_iter().map(Ok).collect(),
2198 Err(e) => vec![Err(e)],
2199 })
2200 });
2201
2202 // Store the content stream for receive() to consume
2203 self.current_stream = Some(Box::pin(stream));
2204
2205 Ok(())
2206 }
2207
2208 pub async fn receive(&mut self) -> Result<Option<ContentBlock>> {
2209 // ========================================================================
2210 // AUTO-EXECUTION MODE
2211 // ========================================================================
2212 if self.options.auto_execute_tools() {
2213 // Check if we have buffered blocks to return
2214 // In auto mode, all final text blocks are buffered and returned one at a time
2215 if self.auto_exec_index < self.auto_exec_buffer.len() {
2216 // Return next buffered block
2217 let block = self.auto_exec_buffer[self.auto_exec_index].clone();
2218 self.auto_exec_index += 1;
2219 return Ok(Some(block));
2220 }
2221
2222 // No buffered blocks - need to run auto-execution loop
2223 // This only happens on the first receive() call after send()
2224 if self.auto_exec_buffer.is_empty() {
2225 match self.auto_execute_loop().await {
2226 Ok(blocks) => {
2227 // Buffer all final text blocks
2228 self.auto_exec_buffer = blocks;
2229 self.auto_exec_index = 0;
2230
2231 // If no blocks, return None (empty response)
2232 if self.auto_exec_buffer.is_empty() {
2233 return Ok(None);
2234 }
2235
2236 // Return first buffered block
2237 let block = self.auto_exec_buffer[0].clone();
2238 self.auto_exec_index = 1;
2239 return Ok(Some(block));
2240 }
2241 Err(e) => return Err(e),
2242 }
2243 }
2244
2245 // Buffer exhausted - return None
2246 Ok(None)
2247 } else {
2248 // ====================================================================
2249 // MANUAL MODE
2250 // ====================================================================
2251 // Stream blocks directly from API without buffering or auto-execution
2252 self.receive_one().await
2253 }
2254 }
2255
2256 /// Interrupts the current operation by setting the interrupt flag.
2257 ///
2258 /// This method provides a thread-safe way to cancel any in-progress streaming
2259 /// operation. The interrupt flag is checked by `receive()` before each block,
2260 /// allowing responsive cancellation.
2261 ///
2262 /// # Behavior
2263 ///
2264 /// - Sets the atomic interrupt flag to `true`
2265 /// - Next `receive()` call will return `Ok(None)` and clear the stream
2266 /// - Flag is automatically reset to `false` on next `send()` call
2267 /// - Safe to call from any thread (uses atomic operations)
2268 /// - Idempotent: calling multiple times has same effect as calling once
2269 /// - No-op if no operation is in progress
2270 ///
2271 /// # Thread Safety
2272 ///
2273 /// This method uses `Arc<AtomicBool>` internally, which can be safely shared
2274 /// across threads. You can clone the interrupt handle and use it from different
2275 /// threads or async tasks:
2276 ///
2277 /// ```rust,no_run
2278 /// # use open_agent::{Client, AgentOptions};
2279 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2280 /// let mut client = Client::new(AgentOptions::default())?;
2281 /// let interrupt_handle = client.interrupt_handle();
2282 ///
2283 /// // Use from another thread
2284 /// tokio::spawn(async move {
2285 /// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2286 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2287 /// });
2288 /// # Ok(())
2289 /// # }
2290 /// ```
2291 ///
2292 /// # State Changes
2293 ///
2294 /// - Sets `interrupted` flag to `true`
2295 /// - Does NOT modify stream, history, or other state directly
2296 /// - Effect takes place on next `receive()` call
2297 ///
2298 /// # Use Cases
2299 ///
2300 /// - User cancellation (e.g., stop button in UI)
2301 /// - Timeout enforcement
2302 /// - Resource cleanup
2303 /// - Emergency shutdown
2304 ///
2305 /// # Examples
2306 ///
2307 /// ## Basic Interruption
2308 ///
2309 /// ```rust,no_run
2310 /// use open_agent::{Client, AgentOptions};
2311 ///
2312 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2313 /// let mut client = Client::new(AgentOptions::default())?;
2314 ///
2315 /// client.send("Tell me a long story").await?;
2316 ///
2317 /// // Interrupt after receiving some blocks
2318 /// let mut count = 0;
2319 /// while let Some(block) = client.receive().await? {
2320 /// count += 1;
2321 /// if count >= 5 {
2322 /// client.interrupt();
2323 /// }
2324 /// }
2325 ///
2326 /// // Client is ready for new queries
2327 /// client.send("What's 2+2?").await?;
2328 /// # Ok(())
2329 /// # }
2330 /// ```
2331 ///
2332 /// ## With Timeout
2333 ///
2334 /// ```rust,no_run
2335 /// use open_agent::{Client, AgentOptions};
2336 /// use std::time::Duration;
2337 ///
2338 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2339 /// let mut client = Client::new(AgentOptions::default())?;
2340 ///
2341 /// client.send("Long request").await?;
2342 ///
2343 /// // Spawn timeout task
2344 /// let interrupt_handle = client.interrupt_handle();
2345 /// tokio::spawn(async move {
2346 /// tokio::time::sleep(Duration::from_secs(10)).await;
2347 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2348 /// });
2349 ///
2350 /// while let Some(_block) = client.receive().await? {
2351 /// // Process until timeout
2352 /// }
2353 /// # Ok(())
2354 /// # }
2355 /// ```
2356 pub fn interrupt(&self) {
2357 // Set interrupt flag using SeqCst for immediate visibility across all threads
2358 self.interrupted.store(true, Ordering::SeqCst);
2359 }
2360
2361 /// Returns a clone of the interrupt handle for thread-safe cancellation.
2362 ///
2363 /// This method provides access to the shared `Arc<AtomicBool>` interrupt flag,
2364 /// allowing it to be used from other threads or async tasks to signal cancellation.
2365 ///
2366 /// # Returns
2367 ///
2368 /// A cloned `Arc<AtomicBool>` that can be used to interrupt operations from any thread.
2369 ///
2370 /// # Examples
2371 ///
2372 /// ```rust,no_run
2373 /// # use open_agent::{Client, AgentOptions};
2374 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2375 /// let mut client = Client::new(AgentOptions::default())?;
2376 /// let interrupt_handle = client.interrupt_handle();
2377 ///
2378 /// // Use from another thread
2379 /// tokio::spawn(async move {
2380 /// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2381 /// interrupt_handle.store(true, std::sync::atomic::Ordering::SeqCst);
2382 /// });
2383 /// # Ok(())
2384 /// # }
2385 /// ```
2386 pub fn interrupt_handle(&self) -> Arc<AtomicBool> {
2387 self.interrupted.clone()
2388 }
2389
2390 /// Returns a reference to the conversation history.
2391 ///
2392 /// The history contains all messages exchanged in the conversation, including:
2393 /// - User messages
2394 /// - Assistant messages (with text and tool use blocks)
2395 /// - Tool result messages
2396 ///
2397 /// # Returns
2398 ///
2399 /// A slice of `Message` objects in chronological order.
2400 ///
2401 /// # Use Cases
2402 ///
2403 /// - Inspecting conversation context
2404 /// - Debugging tool execution flow
2405 /// - Saving conversation state
2406 /// - Implementing custom history management
2407 ///
2408 /// # Examples
2409 ///
2410 /// ```rust
2411 /// # use open_agent::{Client, AgentOptions};
2412 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2413 /// let client = Client::new(AgentOptions::default())?;
2414 ///
2415 /// // Initially empty
2416 /// assert_eq!(client.history().len(), 0);
2417 /// # Ok(())
2418 /// # }
2419 /// ```
2420 pub fn history(&self) -> &[Message] {
2421 &self.history
2422 }
2423
2424 /// Returns a mutable reference to the conversation history.
2425 ///
2426 /// This allows you to modify the history directly for advanced use cases like:
2427 /// - Removing old messages to manage context length
2428 /// - Editing messages for retry scenarios
2429 /// - Injecting synthetic messages for testing
2430 ///
2431 /// # Warning
2432 ///
2433 /// Modifying history directly can lead to inconsistent conversation state if not
2434 /// done carefully. The SDK expects history to follow the proper message flow
2435 /// (user → assistant → tool results → assistant, etc.).
2436 ///
2437 /// # Examples
2438 ///
2439 /// ```rust
2440 /// # use open_agent::{Client, AgentOptions};
2441 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2442 /// let mut client = Client::new(AgentOptions::default())?;
2443 ///
2444 /// // Remove oldest messages to stay within context limit
2445 /// if client.history().len() > 50 {
2446 /// client.history_mut().drain(0..10);
2447 /// }
2448 /// # Ok(())
2449 /// # }
2450 /// ```
2451 pub fn history_mut(&mut self) -> &mut Vec<Message> {
2452 &mut self.history
2453 }
2454
2455 /// Returns a reference to the agent configuration options.
2456 ///
2457 /// Provides read-only access to the `AgentOptions` used to configure this client.
2458 ///
2459 /// # Use Cases
2460 ///
2461 /// - Inspecting current configuration
2462 /// - Debugging issues
2463 /// - Conditional logic based on settings
2464 ///
2465 /// # Examples
2466 ///
2467 /// ```rust
2468 /// # use open_agent::{Client, AgentOptions};
2469 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2470 /// let client = Client::new(AgentOptions::builder()
2471 /// .model("gpt-4")
2472 /// .base_url("http://localhost:1234/v1")
2473 /// .build()?)?;
2474 ///
2475 /// println!("Using model: {}", client.options().model());
2476 /// # Ok(())
2477 /// # }
2478 /// ```
2479 pub fn options(&self) -> &AgentOptions {
2480 &self.options
2481 }
2482
2483 /// Clears all conversation history.
2484 ///
2485 /// This resets the conversation to a blank slate while preserving the client
2486 /// configuration (tools, hooks, model, etc.). The next message will start a
2487 /// fresh conversation with no prior context.
2488 ///
2489 /// # State Changes
2490 ///
2491 /// - Clears `history` vector
2492 /// - Does NOT modify current stream, options, or other state
2493 ///
2494 /// # Use Cases
2495 ///
2496 /// - Starting a new conversation
2497 /// - Preventing context length issues
2498 /// - Clearing sensitive data
2499 /// - Implementing conversation sessions
2500 ///
2501 /// # Examples
2502 ///
2503 /// ```rust,no_run
2504 /// # use open_agent::{Client, AgentOptions, ContentBlock};
2505 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2506 /// let mut client = Client::new(AgentOptions::default())?;
2507 ///
2508 /// // First conversation
2509 /// client.send("Hello").await?;
2510 /// while let Some(_) = client.receive().await? {}
2511 ///
2512 /// // Clear and start fresh
2513 /// client.clear_history();
2514 ///
2515 /// // New conversation with no memory of previous
2516 /// client.send("Hello again").await?;
2517 /// # Ok(())
2518 /// # }
2519 /// ```
2520 pub fn clear_history(&mut self) {
2521 self.history.clear();
2522 }
2523
2524 /// Adds a tool result to the conversation history for manual tool execution.
2525 ///
2526 /// This method is used exclusively in **manual mode** after receiving a `ToolUseBlock`.
2527 /// The workflow is:
2528 ///
2529 /// 1. `receive()` returns a `ToolUseBlock`
2530 /// 2. You execute the tool yourself
2531 /// 3. Call `add_tool_result()` with the tool's output
2532 /// 4. Call `send("")` to continue the conversation
2533 /// 5. The model receives the tool result and generates a response
2534 ///
2535 /// # Parameters
2536 ///
2537 /// - `tool_use_id`: The unique ID from the `ToolUseBlock` (must match exactly)
2538 /// - `content`: The tool's output as a JSON value
2539 ///
2540 /// # Behavior
2541 ///
2542 /// Creates a `ToolResultBlock` and adds it to conversation history as a tool message.
2543 /// This preserves the tool call/result pairing that the model needs to understand
2544 /// the conversation flow.
2545 ///
2546 /// # State Changes
2547 ///
2548 /// - Appends a tool message to `history`
2549 /// - Does NOT modify stream or trigger any requests
2550 ///
2551 /// # Important Notes
2552 ///
2553 /// - **Not used in auto mode**: Auto-execution handles tool results automatically
2554 /// - **ID must match**: The `tool_use_id` must match the ID from the `ToolUseBlock`
2555 /// - **No validation**: This method doesn't validate the result format
2556 /// - **Must call send()**: After adding result(s), call `send("")` to continue
2557 ///
2558 /// # Examples
2559 ///
2560 /// ## Basic Manual Tool Execution
2561 ///
2562 /// ```rust,no_run
2563 /// use open_agent::{Client, AgentOptions, ContentBlock};
2564 /// use serde_json::json;
2565 ///
2566 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2567 /// let mut client = Client::new(AgentOptions::default())?;
2568 /// client.send("Use the calculator").await?;
2569 ///
2570 /// while let Some(block) = client.receive().await? {
2571 /// match block {
2572 /// ContentBlock::ToolUse(tool_use) => {
2573 /// // Execute tool manually
2574 /// let result = json!({"result": 42});
2575 ///
2576 /// // Add result to history
2577 /// client.add_tool_result(tool_use.id(), result)?;
2578 ///
2579 /// // Continue conversation to get model's response
2580 /// client.send("").await?;
2581 /// }
2582 /// ContentBlock::Text(text) => {
2583 /// println!("{}", text.text);
2584 /// }
2585 /// ContentBlock::ToolResult(_) | ContentBlock::Image(_) => {}
2586 /// }
2587 /// }
2588 /// # Ok(())
2589 /// # }
2590 /// ```
2591 ///
2592 /// ## Handling Tool Errors
2593 ///
2594 /// ```rust,no_run
2595 /// use open_agent::{Client, AgentOptions, ContentBlock};
2596 /// use serde_json::json;
2597 ///
2598 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2599 /// # let mut client = Client::new(AgentOptions::default())?;
2600 /// # client.send("test").await?;
2601 /// while let Some(block) = client.receive().await? {
2602 /// if let ContentBlock::ToolUse(tool_use) = block {
2603 /// // Try to execute tool
2604 /// let result = match execute_tool(tool_use.name(), tool_use.input()) {
2605 /// Ok(output) => output,
2606 /// Err(e) => json!({
2607 /// "error": e.to_string(),
2608 /// "tool": tool_use.name()
2609 /// })
2610 /// };
2611 ///
2612 /// client.add_tool_result(tool_use.id(), result)?;
2613 /// client.send("").await?;
2614 /// }
2615 /// }
2616 ///
2617 /// # fn execute_tool(name: &str, input: &serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
2618 /// # Ok(json!({}))
2619 /// # }
2620 /// # Ok(())
2621 /// # }
2622 /// ```
2623 ///
2624 /// ## Multiple Tool Calls
2625 ///
2626 /// ```rust,no_run
2627 /// use open_agent::{Client, AgentOptions, ContentBlock};
2628 /// use serde_json::json;
2629 ///
2630 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2631 /// # let mut client = Client::new(AgentOptions::default())?;
2632 /// client.send("Calculate 2+2 and 3+3").await?;
2633 ///
2634 /// let mut tool_calls = Vec::new();
2635 ///
2636 /// // Collect all tool calls
2637 /// while let Some(block) = client.receive().await? {
2638 /// if let ContentBlock::ToolUse(tool_use) = block {
2639 /// tool_calls.push(tool_use);
2640 /// }
2641 /// }
2642 ///
2643 /// // Execute and add results for all tools
2644 /// for tool_call in tool_calls {
2645 /// let result = json!({"result": 42}); // Execute tool
2646 /// client.add_tool_result(tool_call.id(), result)?;
2647 /// }
2648 ///
2649 /// // Continue conversation
2650 /// client.send("").await?;
2651 /// # Ok(())
2652 /// # }
2653 /// ```
2654 pub fn add_tool_result(&mut self, tool_use_id: &str, content: serde_json::Value) -> Result<()> {
2655 use crate::types::ToolResultBlock;
2656
2657 // Create a tool result block with the given ID and content
2658 let result_block = ToolResultBlock::new(tool_use_id, content);
2659
2660 // Add to history as a tool message
2661 // Note: ToolResultBlock is properly serialized in build_api_request()
2662 // as a separate message with role="tool" and tool_call_id set
2663 let serialized = serde_json::to_string(result_block.content())
2664 .map_err(|e| Error::config(format!("Failed to serialize tool result: {}", e)))?;
2665
2666 self.history.push(Message::new(
2667 MessageRole::Tool,
2668 vec![ContentBlock::Text(TextBlock::new(serialized))],
2669 ));
2670
2671 Ok(())
2672 }
2673
2674 /// Looks up a registered tool by name.
2675 ///
2676 /// This method provides access to the tool registry for manual execution scenarios.
2677 /// It searches the tools registered in `AgentOptions` and returns a reference to
2678 /// the matching tool if found.
2679 ///
2680 /// # Parameters
2681 ///
2682 /// - `name`: The tool name to search for (case-sensitive)
2683 ///
2684 /// # Returns
2685 ///
2686 /// - `Some(&Tool)`: Tool found
2687 /// - `None`: No tool with that name
2688 ///
2689 /// # Use Cases
2690 ///
2691 /// - Manual tool execution in response to `ToolUseBlock`
2692 /// - Validating tool availability before offering features
2693 /// - Inspecting tool metadata (name, description, schema)
2694 ///
2695 /// # Examples
2696 ///
2697 /// ## Execute Tool Manually
2698 ///
2699 /// ```rust,no_run
2700 /// use open_agent::{Client, AgentOptions, ContentBlock};
2701 ///
2702 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
2703 /// # let mut client = Client::new(AgentOptions::default())?;
2704 /// # client.send("test").await?;
2705 /// while let Some(block) = client.receive().await? {
2706 /// if let ContentBlock::ToolUse(tool_use) = block {
2707 /// if let Some(tool) = client.get_tool(tool_use.name()) {
2708 /// // Execute the tool
2709 /// let result = tool.execute(tool_use.input().clone()).await?;
2710 /// client.add_tool_result(tool_use.id(), result)?;
2711 /// client.send("").await?;
2712 /// } else {
2713 /// println!("Unknown tool: {}", tool_use.name());
2714 /// }
2715 /// }
2716 /// }
2717 /// # Ok(())
2718 /// # }
2719 /// ```
2720 ///
2721 /// ## Check Tool Availability
2722 ///
2723 /// ```rust
2724 /// # use open_agent::{Client, AgentOptions};
2725 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2726 /// # let client = Client::new(AgentOptions::default())?;
2727 /// if client.get_tool("calculator").is_some() {
2728 /// println!("Calculator is available");
2729 /// }
2730 /// # Ok(())
2731 /// # }
2732 /// ```
2733 pub fn get_tool(&self, name: &str) -> Option<&crate::tools::Tool> {
2734 // Search registered tools by name
2735 self.options
2736 .tools()
2737 .iter()
2738 .find(|t| t.name() == name)
2739 .map(|t| t.as_ref())
2740 }
2741}
2742
2743#[cfg(test)]
2744mod tests {
2745 use super::*;
2746
2747 #[test]
2748 fn test_client_creation() {
2749 let options = AgentOptions::builder()
2750 .system_prompt("Test")
2751 .model("test-model")
2752 .base_url("http://localhost:1234/v1")
2753 .build()
2754 .unwrap();
2755
2756 let client = Client::new(options).expect("Should create client successfully");
2757 assert_eq!(client.history().len(), 0);
2758 }
2759
2760 #[test]
2761 fn test_client_new_returns_result() {
2762 // Test that Client::new() returns Result instead of panicking
2763 let options = AgentOptions::builder()
2764 .system_prompt("Test")
2765 .model("test-model")
2766 .base_url("http://localhost:1234/v1")
2767 .build()
2768 .unwrap();
2769
2770 // This should not panic - it should return Ok(client)
2771 let result = Client::new(options);
2772 assert!(result.is_ok(), "Client::new() should return Ok");
2773
2774 let client = result.unwrap();
2775 assert_eq!(client.history().len(), 0);
2776 }
2777
2778 #[test]
2779 fn test_interrupt_flag_initial_state() {
2780 let options = AgentOptions::builder()
2781 .system_prompt("Test")
2782 .model("test-model")
2783 .base_url("http://localhost:1234/v1")
2784 .build()
2785 .unwrap();
2786
2787 let client = Client::new(options).expect("Should create client successfully");
2788 // Initially not interrupted
2789 assert!(!client.interrupted.load(Ordering::SeqCst));
2790 }
2791
2792 #[test]
2793 fn test_interrupt_sets_flag() {
2794 let options = AgentOptions::builder()
2795 .system_prompt("Test")
2796 .model("test-model")
2797 .base_url("http://localhost:1234/v1")
2798 .build()
2799 .unwrap();
2800
2801 let client = Client::new(options).expect("Should create client successfully");
2802 client.interrupt();
2803 assert!(client.interrupted.load(Ordering::SeqCst));
2804 }
2805
2806 #[test]
2807 fn test_interrupt_idempotent() {
2808 let options = AgentOptions::builder()
2809 .system_prompt("Test")
2810 .model("test-model")
2811 .base_url("http://localhost:1234/v1")
2812 .build()
2813 .unwrap();
2814
2815 let client = Client::new(options).expect("Should create client successfully");
2816 client.interrupt();
2817 assert!(client.interrupted.load(Ordering::SeqCst));
2818
2819 // Call again - should still be interrupted
2820 client.interrupt();
2821 assert!(client.interrupted.load(Ordering::SeqCst));
2822 }
2823
2824 #[tokio::test]
2825 async fn test_receive_returns_none_when_interrupted() {
2826 let options = AgentOptions::builder()
2827 .system_prompt("Test")
2828 .model("test-model")
2829 .base_url("http://localhost:1234/v1")
2830 .build()
2831 .unwrap();
2832
2833 let mut client = Client::new(options).expect("Should create client successfully");
2834
2835 // Interrupt before receiving
2836 client.interrupt();
2837
2838 // NEW SIGNATURE: receive() should return Ok(None) when interrupted
2839 let result = client.receive().await;
2840 assert!(result.is_ok());
2841 assert!(result.unwrap().is_none());
2842 }
2843
2844 #[tokio::test]
2845 async fn test_receive_returns_ok_none_when_no_stream() {
2846 let options = AgentOptions::builder()
2847 .system_prompt("Test")
2848 .model("test-model")
2849 .base_url("http://localhost:1234/v1")
2850 .build()
2851 .unwrap();
2852
2853 let mut client = Client::new(options).expect("Should create client successfully");
2854
2855 // No stream started - receive() should return Ok(None)
2856 let result = client.receive().await;
2857 assert!(result.is_ok());
2858 assert!(result.unwrap().is_none());
2859 }
2860
2861 #[tokio::test]
2862 async fn test_receive_error_propagation() {
2863 // This test demonstrates that errors are wrapped in Err(), not Some(Err())
2864 // We'll verify this behavior when we have a mock stream that produces errors
2865 let options = AgentOptions::builder()
2866 .system_prompt("Test")
2867 .model("test-model")
2868 .base_url("http://localhost:1234/v1")
2869 .build()
2870 .unwrap();
2871
2872 let client = Client::new(options).expect("Should create client successfully");
2873
2874 // Signature check: receive() returns Result<Option<ContentBlock>>
2875 // This means we can use ? operator cleanly:
2876 // while let Some(block) = client.receive().await? { ... }
2877
2878 // Type assertion to ensure signature is correct
2879 let _: Result<Option<ContentBlock>> = std::future::ready(Ok(None)).await;
2880 drop(client);
2881 }
2882
2883 #[test]
2884 fn test_empty_content_parts_protection() {
2885 // Test for Issue #3 - Verify empty content_parts causes appropriate handling
2886 // This documents expected behavior: messages with images should have content
2887
2888 use crate::types::{ContentBlock, ImageBlock, Message, MessageRole};
2889
2890 // GIVEN: Message with an image
2891 let img = ImageBlock::from_url("https://example.com/test.jpg").expect("Valid URL");
2892
2893 let msg = Message::new(MessageRole::User, vec![ContentBlock::Image(img)]);
2894
2895 // WHEN: Building content_parts
2896 let mut content_parts = Vec::new();
2897 for block in &msg.content {
2898 match block {
2899 ContentBlock::Text(text) => {
2900 content_parts.push(crate::types::OpenAIContentPart::text(&text.text));
2901 }
2902 ContentBlock::Image(image) => {
2903 content_parts.push(crate::types::OpenAIContentPart::from_image(image));
2904 }
2905 ContentBlock::ToolUse(_) | ContentBlock::ToolResult(_) => {}
2906 }
2907 }
2908
2909 // THEN: content_parts should not be empty
2910 assert!(
2911 !content_parts.is_empty(),
2912 "Messages with images should produce non-empty content_parts"
2913 );
2914 }
2915}