use std::fmt;
use chrono::{DateTime, Utc};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
pub const STREAM_END_SCOPE: &str = "__weavegraph_stream_end__";
pub const INVOCATION_END_SCOPE: &str = "__weavegraph_invocation_end__";
pub const DIAGNOSTIC_SCOPE: &str = "__weavegraph_diagnostic__";
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Event {
Node(NodeEvent),
Diagnostic(DiagnosticEvent),
LLM(LLMStreamingEvent),
}
impl Event {
pub fn node_message(scope: impl Into<String>, message: impl Into<String>) -> Self {
Event::Node(NodeEvent::new(None, None, scope.into(), message.into()))
}
pub fn node_message_with_meta(
node_id: impl Into<String>,
step: u64,
scope: impl Into<String>,
message: impl Into<String>,
) -> Self {
Event::Node(NodeEvent::new(
Some(node_id.into()),
Some(step),
scope.into(),
message.into(),
))
}
pub fn node_message_with_metadata(
node_id: impl Into<String>,
step: u64,
scope: impl Into<String>,
message: impl Into<String>,
metadata: FxHashMap<String, Value>,
) -> Self {
Event::Node(
NodeEvent::new(
Some(node_id.into()),
Some(step),
scope.into(),
message.into(),
)
.with_metadata(metadata),
)
}
pub fn diagnostic(scope: impl Into<String>, message: impl Into<String>) -> Self {
Event::Diagnostic(DiagnosticEvent {
scope: scope.into(),
message: message.into(),
})
}
pub fn scope_label(&self) -> Option<&str> {
match self {
Event::Node(n) => Some(n.scope()),
Event::Diagnostic(d) => Some(d.scope()),
Event::LLM(l) => Some(l.scope().as_ref()),
}
}
pub fn message(&self) -> &str {
match self {
Event::Node(n) => n.message(),
Event::Diagnostic(d) => d.message(),
Event::LLM(l) => l.chunk(),
}
}
pub fn to_json_value(&self) -> serde_json::Value {
use serde_json::json;
let (event_type, metadata, timestamp) = match self {
Event::Node(n) => {
let mut meta: serde_json::Map<String, Value> = n
.metadata()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if let Some(id) = n.node_id() {
meta.insert("node_id".to_owned(), json!(id));
}
if let Some(step) = n.step() {
meta.insert("step".to_owned(), json!(step));
}
("node", Value::Object(meta), Utc::now())
}
Event::Diagnostic(_) => ("diagnostic", json!({}), Utc::now()),
Event::LLM(l) => {
let mut meta: serde_json::Map<String, Value> = l
.metadata()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if let Some(id) = l.session_id() {
meta.insert("session_id".to_owned(), json!(id));
}
if let Some(id) = l.node_id() {
meta.insert("node_id".to_owned(), json!(id));
}
if let Some(id) = l.stream_id() {
meta.insert("stream_id".to_owned(), json!(id));
}
meta.insert("is_final".to_owned(), json!(l.is_final()));
("llm", Value::Object(meta), l.timestamp())
}
};
json!({
"type": event_type,
"scope": self.scope_label(),
"message": self.message(),
"timestamp": timestamp.to_rfc3339(),
"metadata": metadata,
})
}
pub fn to_json_string(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(&self.to_json_value())
}
pub fn to_json_pretty(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(&self.to_json_value())
}
}
impl fmt::Display for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Event::Node(n) => match (n.node_id(), n.step()) {
(Some(id), Some(step)) => write!(f, "[{id}@{step}] {}", n.message()),
(Some(id), None) => write!(f, "[{id}] {}", n.message()),
(None, Some(step)) => write!(f, "[step {step}] {}", n.message()),
(None, None) => write!(f, "{}", n.message()),
},
Event::Diagnostic(d) => write!(f, "{}", d.message()),
Event::LLM(l) => {
if let Some(id) = l.stream_id() {
write!(f, "[LLM {id}] {}", l.chunk())
} else if let Some(id) = l.node_id() {
write!(f, "[LLM {id}] {}", l.chunk())
} else {
write!(f, "{}", l.chunk())
}
}
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeEvent {
node_id: Option<String>,
step: Option<u64>,
scope: String,
message: String,
#[serde(default)]
metadata: FxHashMap<String, Value>,
}
impl NodeEvent {
pub fn new(node_id: Option<String>, step: Option<u64>, scope: String, message: String) -> Self {
Self {
node_id,
step,
scope,
message,
metadata: FxHashMap::default(),
}
}
pub fn node_id(&self) -> Option<&str> {
self.node_id.as_deref()
}
pub fn step(&self) -> Option<u64> {
self.step
}
pub fn scope(&self) -> &str {
&self.scope
}
pub fn message(&self) -> &str {
&self.message
}
pub fn metadata(&self) -> &FxHashMap<String, Value> {
&self.metadata
}
pub fn with_metadata(mut self, metadata: FxHashMap<String, Value>) -> Self {
self.metadata = metadata;
self
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct DiagnosticEvent {
scope: String,
message: String,
}
impl DiagnosticEvent {
pub fn scope(&self) -> &str {
&self.scope
}
pub fn message(&self) -> &str {
&self.message
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum LLMStreamingEventScope {
Streaming,
Chunk,
Final,
Error,
}
impl AsRef<str> for LLMStreamingEventScope {
fn as_ref(&self) -> &str {
match self {
Self::Streaming => "stream",
Self::Chunk => "chunk",
Self::Final => STREAM_END_SCOPE,
Self::Error => "error",
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct LLMStreamingEvent {
session_id: Option<String>,
node_id: Option<String>,
stream_id: Option<String>,
chunk: String,
is_final: bool,
scope: LLMStreamingEventScope,
metadata: FxHashMap<String, Value>,
timestamp: DateTime<Utc>,
}
impl LLMStreamingEvent {
pub fn builder(chunk: impl Into<String>) -> LLMStreamingEventBuilder {
LLMStreamingEventBuilder::new(chunk)
}
pub fn chunk_event(
session_id: Option<String>,
node_id: Option<String>,
stream_id: Option<String>,
chunk: impl Into<String>,
metadata: FxHashMap<String, Value>,
) -> Self {
Self {
session_id,
node_id,
stream_id,
chunk: chunk.into(),
is_final: false,
scope: LLMStreamingEventScope::Chunk,
metadata,
timestamp: Utc::now(),
}
}
pub fn final_event(
session_id: Option<String>,
node_id: Option<String>,
stream_id: Option<String>,
chunk: impl Into<String>,
metadata: FxHashMap<String, Value>,
) -> Self {
Self {
session_id,
node_id,
stream_id,
chunk: chunk.into(),
is_final: true,
scope: LLMStreamingEventScope::Final,
metadata,
timestamp: Utc::now(),
}
}
pub fn error_event(
session_id: Option<String>,
node_id: Option<String>,
stream_id: Option<String>,
error_message: impl Into<String>,
) -> Self {
let metadata = [("severity".to_owned(), Value::String("error".to_owned()))]
.into_iter()
.collect();
Self {
session_id,
node_id,
stream_id,
chunk: error_message.into(),
is_final: true,
scope: LLMStreamingEventScope::Error,
metadata,
timestamp: Utc::now(),
}
}
pub fn session_id(&self) -> Option<&str> {
self.session_id.as_deref()
}
pub fn node_id(&self) -> Option<&str> {
self.node_id.as_deref()
}
pub fn stream_id(&self) -> Option<&str> {
self.stream_id.as_deref()
}
pub fn chunk(&self) -> &str {
&self.chunk
}
pub fn is_final(&self) -> bool {
self.is_final
}
pub fn scope(&self) -> &LLMStreamingEventScope {
&self.scope
}
pub fn metadata(&self) -> &FxHashMap<String, Value> {
&self.metadata
}
pub fn timestamp(&self) -> DateTime<Utc> {
self.timestamp
}
pub fn with_metadata(mut self, metadata: FxHashMap<String, Value>) -> Self {
self.metadata = metadata;
self
}
pub fn with_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.timestamp = timestamp;
self
}
}
pub struct LLMStreamingEventBuilder {
session_id: Option<String>,
node_id: Option<String>,
stream_id: Option<String>,
chunk: String,
is_final: bool,
scope: LLMStreamingEventScope,
metadata: FxHashMap<String, Value>,
timestamp: Option<DateTime<Utc>>,
}
impl LLMStreamingEventBuilder {
fn new(chunk: impl Into<String>) -> Self {
Self {
session_id: None,
node_id: None,
stream_id: None,
chunk: chunk.into(),
is_final: false,
scope: LLMStreamingEventScope::Streaming,
metadata: FxHashMap::default(),
timestamp: None,
}
}
pub fn session_id(mut self, id: impl Into<String>) -> Self {
self.session_id = Some(id.into());
self
}
pub fn node_id(mut self, id: impl Into<String>) -> Self {
self.node_id = Some(id.into());
self
}
pub fn stream_id(mut self, id: impl Into<String>) -> Self {
self.stream_id = Some(id.into());
self
}
pub fn is_final(mut self, v: bool) -> Self {
self.is_final = v;
self
}
pub fn scope(mut self, s: LLMStreamingEventScope) -> Self {
self.scope = s;
self
}
pub fn metadata(mut self, m: FxHashMap<String, Value>) -> Self {
self.metadata = m;
self
}
pub fn timestamp(mut self, ts: DateTime<Utc>) -> Self {
self.timestamp = Some(ts);
self
}
pub fn build(self) -> LLMStreamingEvent {
LLMStreamingEvent {
session_id: self.session_id,
node_id: self.node_id,
stream_id: self.stream_id,
chunk: self.chunk,
is_final: self.is_final,
scope: self.scope,
metadata: self.metadata,
timestamp: self.timestamp.unwrap_or_else(Utc::now),
}
}
}