use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use parking_lot::Mutex;
use serde_json::Value;
use tokio::sync::{broadcast, Notify};
use crate::types::{Step, StepStatus, StreamChunk, UsageMetadata};
pub(crate) struct LoopState<M> {
pub history: Mutex<Vec<M>>,
pub idle: Arc<AtomicBool>,
pub idle_notify: Arc<Notify>,
pub cancel: Arc<AtomicBool>,
pub steps: broadcast::Sender<Step>,
pub next_step_index: AtomicU32,
pub last_turn_usage: Mutex<Option<UsageMetadata>>,
pub last_structured_output: Mutex<Option<Value>>,
}
impl<M> LoopState<M> {
pub fn new(steps: broadcast::Sender<Step>) -> Self {
Self {
history: Mutex::new(Vec::new()),
idle: Arc::new(AtomicBool::new(true)),
idle_notify: Arc::new(Notify::new()),
cancel: Arc::new(AtomicBool::new(false)),
steps,
next_step_index: AtomicU32::new(0),
last_turn_usage: Mutex::new(None),
last_structured_output: Mutex::new(None),
}
}
pub fn alloc_step_index(&self) -> u32 {
self.next_step_index.fetch_add(1, Ordering::Relaxed)
}
pub fn emit(&self, step: Step) {
let _ = self.steps.send(step);
}
pub fn emit_chunk_step(&self, chunk: StreamChunk) {
match chunk {
StreamChunk::ToolCall(tc) => {
self.emit(Step::tool_call(self.alloc_step_index(), tc, StepStatus::Done))
}
StreamChunk::ToolResult(tr) => self.emit(Step::tool_result(self.alloc_step_index(), tr)),
_ => {}
}
}
}
pub(crate) mod history {
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::error::{Error, Result};
pub fn encode<M: Serialize>(history: &[M]) -> Result<Vec<u8>> {
serde_json::to_vec(history).map_err(|e| Error::other(format!("history_bytes: {e}")))
}
pub fn decode<M: DeserializeOwned>(bytes: &[u8]) -> Result<Vec<M>> {
if bytes.is_empty() {
return Ok(Vec::new());
}
serde_json::from_slice(bytes).map_err(|e| Error::other(format!("set_history_bytes: {e}")))
}
pub fn decode_lenient<M: DeserializeOwned>(bytes: &[u8]) -> Result<Vec<M>> {
if bytes.is_empty() {
return Ok(Vec::new());
}
let raw: Vec<serde_json::Value> = serde_json::from_slice(bytes)
.map_err(|e| Error::other(format!("decode_transcript_bytes: {e}")))?;
Ok(raw
.into_iter()
.filter_map(|v| serde_json::from_value(v).ok())
.collect())
}
}
#[cfg(test)]
pub(crate) mod transcript_contract {
use crate::types::{TranscriptEntry, TranscriptRole};
pub fn assert_single_call_result(
entries: &[TranscriptEntry],
name: &str,
) -> serde_json::Value {
let asst = entries
.iter()
.find(|e| !e.tool_calls.is_empty())
.expect("an assistant entry with a tool call");
assert!(
matches!(asst.role, TranscriptRole::Assistant),
"tool calls live on the assistant turn"
);
assert_eq!(asst.tool_calls.len(), 1, "exactly one tool call projected");
let call = &asst.tool_calls[0];
assert_eq!(call.name, name, "tool name preserved");
assert!(call.error.is_none(), "a success must not set error");
call.result
.clone()
.expect("the result is correlated back to its call")
}
pub fn assert_single_call_error(entries: &[TranscriptEntry], name: &str) -> String {
let asst = entries
.iter()
.find(|e| !e.tool_calls.is_empty())
.expect("an assistant entry with a tool call");
assert_eq!(asst.tool_calls.len(), 1, "exactly one tool call projected");
let call = &asst.tool_calls[0];
assert_eq!(call.name, name, "tool name preserved");
assert!(
call.result.is_none(),
"a failure must surface as error, not result"
);
call.error.clone().expect("the failure is the typed error")
}
}