use std::fmt;
use std::pin::Pin;
use chrono::{DateTime, Utc};
use futures_util::Stream;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
use super::event::AgentEvent;
use super::run::RunId;
use crate::provider::ProviderId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct RuntimeEventId(Uuid);
impl RuntimeEventId {
#[must_use]
pub fn new() -> Self {
Self(Uuid::now_v7())
}
#[must_use]
pub fn from_uuid(uuid: Uuid) -> Self {
Self(uuid)
}
#[must_use]
pub fn as_uuid(&self) -> Uuid {
self.0
}
}
impl Default for RuntimeEventId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for RuntimeEventId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<RuntimeEventId> for Uuid {
fn from(value: RuntimeEventId) -> Self {
value.0
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeEventEnvelope {
pub event_id: RuntimeEventId,
pub seq: u64,
pub run_id: RunId,
pub session_id: Option<Uuid>,
pub event: AgentEvent,
pub emitted_at: DateTime<Utc>,
}
impl RuntimeEventEnvelope {
#[must_use]
pub fn run_id(&self) -> RunId {
self.event.run_id()
}
#[must_use]
pub fn is_terminal(&self) -> bool {
self.event.is_terminal()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum RuntimeRoom {
Run(RunId),
Session(Uuid),
Provider(ProviderId),
}
impl fmt::Display for RuntimeRoom {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RuntimeRoom::Run(run_id) => write!(f, "run:{run_id}"),
RuntimeRoom::Session(session_id) => write!(f, "session:{session_id}"),
RuntimeRoom::Provider(provider_id) => write!(f, "provider:{provider_id}"),
}
}
}
pub type BoxRuntimeEventStream =
Pin<Box<dyn Stream<Item = Result<RuntimeEventEnvelope, RuntimeStreamError>> + Send + 'static>>;
#[derive(Debug, Error)]
pub enum RuntimeStreamError {
#[error("runtime stream publish failed: {message}")]
Publish {
message: String,
},
#[error("runtime stream subscriber lagged, skipped {skipped} events")]
Lagged {
skipped: u64,
},
#[error("runtime stream subscribe failed: {message}")]
Subscribe {
message: String,
},
#[error("runtime stream closed")]
Closed,
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::expect_used)]
use super::*;
#[test]
fn run_room_display_is_stable() {
let id = RunId::from_uuid(Uuid::nil());
assert_eq!(RuntimeRoom::Run(id).to_string(), format!("run:{id}"));
}
#[test]
fn session_room_display_is_stable() {
let id = Uuid::nil();
assert_eq!(
RuntimeRoom::Session(id).to_string(),
format!("session:{id}")
);
}
#[test]
fn provider_room_display_is_stable() {
let id = ProviderId::new("acme");
let expected = format!("provider:{id}");
assert_eq!(RuntimeRoom::Provider(id).to_string(), expected);
}
}