Skip to main content

tool_loop_channel

Function tool_loop_channel 

Source
pub fn tool_loop_channel<Ctx: LoopDepth + Send + Sync + 'static>(
    provider: Arc<dyn DynProvider>,
    registry: Arc<ToolRegistry<Ctx>>,
    params: ChatParams,
    config: ToolLoopConfig,
    ctx: Arc<Ctx>,
    buffer_size: usize,
) -> (Receiver<Result<StreamEvent, LlmError>>, JoinHandle<ToolLoopResult>)
Expand description

Channel-based tool loop with bounded buffer for backpressure.

Unlike tool_loop_stream, this function spawns an internal task and sends events through a bounded channel. This provides natural backpressure: if the consumer is slow, the producer blocks when the buffer is full, preventing unbounded memory growth.

Returns a tuple of:

  • Receiver<Result<StreamEvent, LlmError>> - events from the stream
  • JoinHandle<ToolLoopResult> - the final result (join to get it)

§Backpressure

The buffer_size parameter controls how many events can be buffered before the producer blocks. Choose based on your use case:

  • Small (4-16): Tight backpressure, minimal memory
  • Medium (32-64): Balance between latency and memory
  • Large (128+): More latency tolerance, higher memory

§Consumer Drop

If the receiver is dropped before the stream completes, the internal task will detect this (send returns error) and terminate gracefully. The join handle will still return a ToolLoopResult, though it may indicate partial completion.

§Example

use std::sync::Arc;
use llm_stack_core::{ChatParams, ChatMessage, ToolLoopConfig, ToolRegistry, StreamEvent};
use llm_stack_core::tool::tool_loop_channel;

async fn example(
    provider: Arc<dyn llm_stack_core::DynProvider>,
    registry: Arc<ToolRegistry<()>>,
) -> Result<(), Box<dyn std::error::Error>> {
    let params = ChatParams {
        messages: vec![ChatMessage::user("Hello")],
        ..Default::default()
    };

    let (mut rx, handle) = tool_loop_channel(
        provider,
        registry,
        params,
        ToolLoopConfig::default(),
        Arc::new(()),
        32, // buffer size
    );

    while let Some(event) = rx.recv().await {
        match event? {
            StreamEvent::TextDelta(text) => print!("{text}"),
            StreamEvent::Done { .. } => break,
            _ => {}
        }
    }

    let result = handle.await?;
    println!("\nCompleted in {} iterations", result.iterations);
    Ok(())
}