use std::rc::Rc;
use crate::value::{VmError, VmValue};
use super::api::LlmCallOptions;
use super::helpers::ResolvedProvider;
pub(crate) async fn vm_stream_llm(
opts: &LlmCallOptions,
tx: &tokio::sync::mpsc::Sender<VmValue>,
) -> Result<(), VmError> {
use reqwest_eventsource::{Event, EventSource};
use tokio_stream::StreamExt;
let provider = &opts.provider;
super::ensure_real_llm_allowed(provider)?;
let client = super::shared_streaming_client().clone();
let resolved = ResolvedProvider::resolve(provider);
let body = if resolved.is_anthropic_style {
let mut body = serde_json::json!({
"model": opts.model,
"messages": opts.messages,
"max_tokens": opts.max_tokens,
"stream": true,
});
if let Some(ref sys) = opts.system {
body["system"] = serde_json::json!(sys);
}
if let Some(temp) = opts.temperature {
body["temperature"] = serde_json::json!(temp);
}
body
} else {
let mut msgs = Vec::new();
if let Some(ref sys) = opts.system {
msgs.push(serde_json::json!({"role": "system", "content": sys}));
}
msgs.extend(opts.messages.iter().cloned());
let mut body = serde_json::json!({
"model": opts.model,
"messages": msgs,
"max_tokens": opts.max_tokens,
"stream": true,
});
if let Some(temp) = opts.temperature {
body["temperature"] = serde_json::json!(temp);
}
body
};
let req = client
.post(resolved.url())
.header("Content-Type", "application/json")
.json(&body);
let request = resolved.apply_headers(req, &opts.api_key);
let mut es = EventSource::new(request).map_err(|e| {
VmError::Thrown(VmValue::String(Rc::from(format!(
"LLM stream setup error: {e}"
))))
})?;
let idle_timeout_secs = opts.idle_timeout.unwrap_or_else(|| {
std::env::var("HARN_LLM_IDLE_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30)
});
let idle_dur = std::time::Duration::from_secs(idle_timeout_secs);
let overall_budget_secs = opts.timeout.unwrap_or(30 * 60);
let overall_dur = std::time::Duration::from_secs(overall_budget_secs);
let stream_start = std::time::Instant::now();
loop {
if stream_start.elapsed() >= overall_dur {
es.close();
return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
"stream overall deadline exceeded: {overall_budget_secs}s budget reached before stream completed"
)))));
}
let remaining_overall = overall_dur.saturating_sub(stream_start.elapsed());
let wait = idle_dur.min(remaining_overall);
let event = match tokio::time::timeout(wait, es.next()).await {
Ok(Some(event)) => event,
Ok(None) => break,
Err(_) => {
es.close();
return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
"stream idle timeout: no data received for {idle_timeout_secs}s"
)))));
}
};
match event {
Ok(Event::Message(msg)) => {
if msg.data == "[DONE]" {
break;
}
let chunk_text = if resolved.is_anthropic_style {
parse_anthropic_sse_chunk(&msg.data)
} else {
parse_openai_sse_chunk(&msg.data)
};
if let Some(text) = chunk_text {
if !text.is_empty() && tx.send(VmValue::String(Rc::from(text))).await.is_err() {
break;
}
}
}
Ok(Event::Open) => {}
Err(_) => break,
}
}
es.close();
Ok(())
}
fn parse_openai_sse_chunk(data: &str) -> Option<String> {
let json: serde_json::Value = serde_json::from_str(data).ok()?;
json["choices"][0]["delta"]["content"]
.as_str()
.map(|s| s.to_string())
}
fn parse_anthropic_sse_chunk(data: &str) -> Option<String> {
let json: serde_json::Value = serde_json::from_str(data).ok()?;
if json["type"].as_str() == Some("content_block_delta") {
return json["delta"]["text"].as_str().map(|s| s.to_string());
}
None
}