mod stream_timeout;
pub(crate) mod sse;
mod runners;
pub use runners::BackendRunners;
pub(crate) mod state;
#[cfg(not(target_arch = "wasm32"))]
pub type KeyProvider = std::sync::Arc<dyn Fn() -> String + Send + Sync>;
#[cfg(target_arch = "wasm32")]
pub type KeyProvider = std::sync::Arc<dyn Fn() -> String>;
#[derive(Clone)]
pub struct AuthTokenProvider(pub KeyProvider);
impl std::fmt::Debug for AuthTokenProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("AuthTokenProvider(<closure>)")
}
}
pub(crate) mod dispatch;
pub(crate) mod compaction;
pub mod gemini;
pub mod mock;
#[cfg(feature = "anthropic")]
pub mod anthropic;
#[cfg(feature = "openai")]
pub mod openai;
#[cfg(feature = "local")]
pub mod local;
#[cfg(feature = "native")]
pub mod mcp;
use futures_util::stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
use crate::connections::StepStream;
use crate::error::Error;
use crate::types::{Step, StepSource, StepStatus};
#[cfg(any(feature = "anthropic", feature = "local", feature = "openai"))]
pub(crate) fn render_system(s: &crate::types::SystemInstructions) -> String {
use crate::types::SystemInstructions;
match s {
SystemInstructions::Custom(c) => c.text.clone(),
SystemInstructions::Templated(t) => {
let mut buf = String::new();
if let Some(id) = &t.identity {
buf.push_str(id);
buf.push_str("\n\n");
}
for section in &t.sections {
if !section.title.is_empty() {
buf.push_str("## ");
buf.push_str(§ion.title);
buf.push('\n');
}
buf.push_str(§ion.content);
buf.push_str("\n\n");
}
buf.trim().to_string()
}
}
}
pub(crate) fn subscribe_step_stream(
rx: tokio::sync::broadcast::Receiver<Step>,
label: &'static str,
) -> StepStream {
let mapped = BroadcastStream::new(rx).map(move |r| match r {
Ok(step)
if step.source == StepSource::System
&& step.status == StepStatus::Error
&& !step.error.is_empty() =>
{
Err(Error::other(step.error))
}
Ok(step) => Ok(step),
Err(e) => Err(Error::other(format!("{label} step lag: {e}"))),
});
#[cfg(not(target_arch = "wasm32"))]
{
mapped.boxed()
}
#[cfg(target_arch = "wasm32")]
{
mapped.boxed_local()
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::StreamExt;
#[tokio::test]
async fn turn_error_step_translates_to_stream_err() {
let (tx, rx) = tokio::sync::broadcast::channel(8);
let mut stream = subscribe_step_stream(rx, "test");
tx.send(Step::turn_error(0, "gemini HTTP 500: boom"))
.expect("subscriber is live");
match stream.next().await.expect("a stream item") {
Ok(step) => panic!("error Step leaked as Ok: {step:?}"),
Err(Error::Other(msg)) => {
assert!(msg.contains("gemini HTTP 500: boom"), "got: {msg}")
}
Err(other) => panic!("unexpected error variant: {other:?}"),
}
}
#[tokio::test]
async fn model_error_status_step_passes_through() {
let (tx, rx) = tokio::sync::broadcast::channel(8);
let mut stream = subscribe_step_stream(rx, "test");
let step = Step::turn_complete(
"t",
0,
StepStatus::Error,
"",
"stopped by safety policy",
false,
None,
None,
);
tx.send(step).expect("subscriber is live");
match stream.next().await.expect("a stream item") {
Ok(step) => assert_eq!(step.status, StepStatus::Error),
Err(e) => panic!("Model-sourced step wrongly translated: {e:?}"),
}
}
}