use std::sync::Arc;
use defect_agent::error::BoxError;
use defect_agent::session::{Session, SessionCreateInfo, SessionObserver};
use futures::StreamExt;
use super::ingest::LangfuseIngest;
use super::projector::TraceProjector;
#[derive(Clone)]
pub struct LangfuseObserver {
ingest: LangfuseIngest,
}
impl LangfuseObserver {
#[must_use]
pub fn new(ingest: LangfuseIngest) -> Self {
Self { ingest }
}
}
impl SessionObserver for LangfuseObserver {
fn on_session_created(
&self,
session: Arc<dyn Session>,
info: SessionCreateInfo,
) -> Result<(), BoxError> {
let mut events = session.subscribe();
let ingest = self.ingest.clone();
let session_id = info.id.0.to_string();
tokio::spawn(async move {
let mut projector = TraceProjector::new(session_id);
let mut new_id = || uuid::Uuid::new_v4().to_string();
while let Some(event) = events.next().await {
let now = chrono::Utc::now().to_rfc3339();
for ev in projector.project(event, &now, &mut new_id) {
ingest.enqueue(ev);
}
}
ingest.flush().await;
});
Ok(())
}
}