use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::provider::Message;
use crate::store::{
ArtifactStore, EmbeddingStore, ExecutionStore, MessageRecord, MessageRole, SessionStore,
};
use super::error::{RuntimeError, RuntimeResult};
use super::event::AgentEvent;
use super::run::{RunId, RunRecord, RunStatus};
use super::state::RunState;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunEventRecord {
pub sequence: u64,
pub run_id: RunId,
pub event: AgentEvent,
pub timestamp: DateTime<Utc>,
}
impl RunEventRecord {
#[must_use]
pub fn new(sequence: u64, run_id: RunId, event: AgentEvent) -> Self {
Self {
sequence,
run_id,
event,
timestamp: Utc::now(),
}
}
}
#[async_trait]
pub trait RunStore: Send + Sync {
async fn create_run(&self, record: RunRecord) -> RuntimeResult<()>;
async fn get_run(&self, run_id: RunId) -> RuntimeResult<Option<RunRecord>>;
async fn get_run_state(&self, run_id: RunId) -> RuntimeResult<Option<RunState>> {
let Some(record) = self.get_run(run_id).await? else {
return Ok(None);
};
let events = self.list_events(run_id).await?;
Ok(Some(RunState::create(&record, &events)))
}
async fn update_run_status(&self, run_id: RunId, status: RunStatus) -> RuntimeResult<()>;
async fn append_event(&self, record: RunEventRecord) -> RuntimeResult<()>;
async fn list_events(&self, run_id: RunId) -> RuntimeResult<Vec<RunEventRecord>>;
async fn list_runs(&self, session_id: Uuid) -> RuntimeResult<Vec<RunRecord>>;
async fn list_runs_filtered(
&self,
session_id: Option<Uuid>,
status: Option<RunStatus>,
limit: usize,
offset: usize,
) -> RuntimeResult<Vec<RunRecord>> {
let _ = (session_id, status, limit, offset);
Err(RuntimeError::Storage(crate::StorageError::BackendError {
backend: "run".to_owned(),
message: "list_runs_filtered not implemented".to_owned(),
source: None,
}))
}
async fn delete_run(&self, run_id: RunId) -> RuntimeResult<()>;
async fn health_check(&self) -> RuntimeResult<()>;
}
pub struct RuntimeStore {
sessions: Box<dyn SessionStore>,
executions: Box<dyn ExecutionStore>,
runs: Box<dyn RunStore>,
embeddings: Option<Box<dyn EmbeddingStore>>,
artifacts: Option<Box<dyn ArtifactStore>>,
}
impl RuntimeStore {
#[must_use]
pub fn new(
sessions: Box<dyn SessionStore>,
executions: Box<dyn ExecutionStore>,
runs: Box<dyn RunStore>,
) -> Self {
Self {
sessions,
executions,
runs,
embeddings: None,
artifacts: None,
}
}
#[must_use]
pub fn with_embeddings(mut self, store: Box<dyn EmbeddingStore>) -> Self {
self.embeddings = Some(store);
self
}
#[must_use]
pub fn with_artifacts(mut self, store: Box<dyn ArtifactStore>) -> Self {
self.artifacts = Some(store);
self
}
#[must_use]
pub fn sessions(&self) -> &dyn SessionStore {
&*self.sessions
}
#[must_use]
pub fn executions(&self) -> &dyn ExecutionStore {
&*self.executions
}
#[must_use]
pub fn runs(&self) -> &dyn RunStore {
&*self.runs
}
#[must_use]
pub fn embeddings(&self) -> Option<&dyn EmbeddingStore> {
self.embeddings.as_deref()
}
#[must_use]
pub fn artifacts(&self) -> Option<&dyn ArtifactStore> {
self.artifacts.as_deref()
}
pub async fn ensure_session(&self, session_id: Option<Uuid>) -> RuntimeResult<Uuid> {
if let Some(id) = session_id {
self.sessions
.get_session(&id)
.await
.map_err(RuntimeError::from)?
.ok_or(RuntimeError::SessionNotFound(id))?;
Ok(id)
} else {
let session =
crate::store::Session::new("Agent Run", crate::provider::ModelName::new("default"));
self.sessions
.create_session(session.clone())
.await
.map_err(RuntimeError::from)?;
Ok(session.id)
}
}
pub async fn append_message(&self, session_id: Uuid, message: &Message) -> RuntimeResult<Uuid> {
let record = message_to_record(session_id, message);
let result = self
.sessions
.append_message(record)
.await
.map_err(RuntimeError::from)?;
Ok(result.id)
}
pub async fn list_messages(&self, session_id: Uuid) -> RuntimeResult<Vec<Message>> {
let records = self
.sessions
.list_messages(&session_id)
.await
.map_err(RuntimeError::from)?;
Ok(records.into_iter().filter_map(record_to_message).collect())
}
}
fn message_to_record(session_id: Uuid, message: &Message) -> MessageRecord {
match message {
Message::System { content } => {
MessageRecord::new(session_id, MessageRole::System, content.clone())
}
Message::User { content } => {
MessageRecord::new(session_id, MessageRole::User, content.clone())
}
Message::Assistant {
content,
tool_calls,
} => MessageRecord::new(session_id, MessageRole::Assistant, content.clone())
.with_tool_calls(tool_calls.clone()),
Message::Tool {
tool_call_id,
name,
content,
} => MessageRecord::new(session_id, MessageRole::Tool, content.clone())
.with_tool_result(tool_call_id.clone(), name.clone()),
}
}
#[must_use]
pub fn record_to_message(record: MessageRecord) -> Option<Message> {
match record.role {
MessageRole::System => Some(Message::System {
content: record.content,
}),
MessageRole::User => Some(Message::User {
content: record.content,
}),
MessageRole::Assistant => Some(Message::Assistant {
content: record.content,
tool_calls: record.tool_calls,
}),
MessageRole::Tool => Some(Message::Tool {
tool_call_id: record.tool_call_id.unwrap_or_default(),
name: record.tool_name.unwrap_or_default(),
content: record.content,
}),
}
}