use crate::file_utils;
use async_trait::async_trait;
use serde::Deserialize;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use tirea_contract::storage::{
has_active_claim_for_mailbox, paginate_mailbox_entries, paginate_runs_in_memory, Committed,
MailboxEntry, MailboxInterrupt, MailboxPage, MailboxQuery, MailboxReader, MailboxState,
MailboxStoreError, MailboxWriter, RunPage, RunQuery, RunRecord, ThreadHead, ThreadListPage,
ThreadListQuery, ThreadReader, ThreadStoreError, ThreadWriter, VersionPrecondition,
};
use tirea_contract::{Thread, ThreadChangeSet, Version};
fn now_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
pub struct FileStore {
base_path: PathBuf,
}
impl FileStore {
pub fn new(base_path: impl Into<PathBuf>) -> Self {
Self {
base_path: base_path.into(),
}
}
pub(super) fn thread_path(&self, thread_id: &str) -> Result<PathBuf, ThreadStoreError> {
Self::validate_thread_id(thread_id)?;
Ok(self.base_path.join(format!("{}.json", thread_id)))
}
fn validate_thread_id(thread_id: &str) -> Result<(), ThreadStoreError> {
file_utils::validate_fs_id(thread_id, "thread id").map_err(ThreadStoreError::InvalidId)
}
fn mailbox_dir(&self) -> PathBuf {
self.base_path.join("_mailbox")
}
fn mailbox_threads_dir(&self) -> PathBuf {
self.base_path.join("_mailbox_threads")
}
fn mailbox_path(&self, entry_id: &str) -> Result<PathBuf, MailboxStoreError> {
file_utils::validate_fs_id(entry_id, "mailbox entry id")
.map_err(MailboxStoreError::Backend)?;
Ok(self.mailbox_dir().join(format!("{entry_id}.json")))
}
fn mailbox_state_path(&self, mailbox_id: &str) -> Result<PathBuf, MailboxStoreError> {
file_utils::validate_fs_id(mailbox_id, "mailbox id").map_err(MailboxStoreError::Backend)?;
Ok(self
.mailbox_threads_dir()
.join(format!("{mailbox_id}.json")))
}
async fn save_mailbox_entry(&self, entry: &MailboxEntry) -> Result<(), MailboxStoreError> {
let payload = serde_json::to_string_pretty(entry)
.map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
let filename = format!("{}.json", entry.entry_id);
file_utils::atomic_json_write(&self.mailbox_dir(), &filename, &payload)
.await
.map_err(MailboxStoreError::Io)
}
async fn save_mailbox_state(&self, state: &MailboxState) -> Result<(), MailboxStoreError> {
let payload = serde_json::to_string_pretty(state)
.map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
let filename = format!("{}.json", state.mailbox_id);
file_utils::atomic_json_write(&self.mailbox_threads_dir(), &filename, &payload)
.await
.map_err(MailboxStoreError::Io)
}
async fn load_mailbox_state_inner(
&self,
mailbox_id: &str,
) -> Result<Option<MailboxState>, MailboxStoreError> {
let path = self.mailbox_state_path(mailbox_id)?;
if !path.exists() {
return Ok(None);
}
let content = tokio::fs::read_to_string(path)
.await
.map_err(MailboxStoreError::Io)?;
let state = serde_json::from_str(&content)
.map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
Ok(Some(state))
}
async fn load_all_mailbox_entries(&self) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
let dir = self.mailbox_dir();
if !dir.exists() {
return Ok(Vec::new());
}
let mut entries = tokio::fs::read_dir(&dir)
.await
.map_err(MailboxStoreError::Io)?;
let mut mailbox_entries = Vec::new();
while let Some(entry) = entries.next_entry().await.map_err(MailboxStoreError::Io)? {
let path = entry.path();
if path.extension().is_none_or(|ext| ext != "json") {
continue;
}
let content = tokio::fs::read_to_string(path)
.await
.map_err(MailboxStoreError::Io)?;
let mailbox_entry: MailboxEntry = serde_json::from_str(&content)
.map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
mailbox_entries.push(mailbox_entry);
}
Ok(mailbox_entries)
}
}
#[async_trait]
impl MailboxReader for FileStore {
async fn load_mailbox_entry(
&self,
entry_id: &str,
) -> Result<Option<MailboxEntry>, MailboxStoreError> {
let path = self.mailbox_path(entry_id)?;
if !path.exists() {
return Ok(None);
}
let content = tokio::fs::read_to_string(path)
.await
.map_err(MailboxStoreError::Io)?;
let entry: MailboxEntry = serde_json::from_str(&content)
.map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
Ok(Some(entry))
}
async fn load_mailbox_state(
&self,
mailbox_id: &str,
) -> Result<Option<MailboxState>, MailboxStoreError> {
self.load_mailbox_state_inner(mailbox_id).await
}
async fn list_mailbox_entries(
&self,
query: &MailboxQuery,
) -> Result<MailboxPage, MailboxStoreError> {
let entries = self.load_all_mailbox_entries().await?;
Ok(paginate_mailbox_entries(&entries, query))
}
}
#[async_trait]
impl MailboxWriter for FileStore {
async fn enqueue_mailbox_entry(&self, entry: &MailboxEntry) -> Result<(), MailboxStoreError> {
let path = self.mailbox_path(&entry.entry_id)?;
if path.exists() {
return Err(MailboxStoreError::AlreadyExists(entry.entry_id.clone()));
}
let mailbox_state = self
.load_mailbox_state_inner(&entry.mailbox_id)
.await?
.unwrap_or(MailboxState {
mailbox_id: entry.mailbox_id.clone(),
current_generation: entry.generation,
updated_at: entry.updated_at,
});
if mailbox_state.current_generation != entry.generation {
return Err(MailboxStoreError::GenerationMismatch {
mailbox_id: entry.mailbox_id.clone(),
expected: mailbox_state.current_generation,
actual: entry.generation,
});
}
if let Some(dedupe_key) = entry.dedupe_key.as_deref() {
let existing = self.load_all_mailbox_entries().await?;
if existing.iter().any(|current| {
current.mailbox_id == entry.mailbox_id
&& current.dedupe_key.as_deref() == Some(dedupe_key)
}) {
return Err(MailboxStoreError::AlreadyExists(dedupe_key.to_string()));
}
}
self.save_mailbox_state(&mailbox_state).await?;
self.save_mailbox_entry(entry).await
}
async fn ensure_mailbox_state(
&self,
mailbox_id: &str,
now: u64,
) -> Result<MailboxState, MailboxStoreError> {
let mut state = self
.load_mailbox_state_inner(mailbox_id)
.await?
.unwrap_or(MailboxState {
mailbox_id: mailbox_id.to_string(),
current_generation: 0,
updated_at: now,
});
state.updated_at = now;
self.save_mailbox_state(&state).await?;
Ok(state)
}
async fn claim_mailbox_entries(
&self,
mailbox_id: Option<&str>,
limit: usize,
consumer_id: &str,
now: u64,
lease_duration_ms: u64,
) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
let all_entries = self.load_all_mailbox_entries().await?;
let mut candidates: Vec<MailboxEntry> = all_entries
.iter()
.filter(|entry| entry.is_claimable(now))
.filter(|entry| mailbox_id.is_none_or(|id| entry.mailbox_id == id))
.cloned()
.collect();
candidates.sort_by(|left, right| {
right
.priority
.cmp(&left.priority)
.then_with(|| left.available_at.cmp(&right.available_at))
.then_with(|| left.created_at.cmp(&right.created_at))
.then_with(|| left.entry_id.cmp(&right.entry_id))
});
let mut claimed_mailbox_ids = std::collections::HashSet::new();
let mut claimed = Vec::new();
for mut entry in candidates {
if claimed.len() >= limit {
break;
}
if claimed_mailbox_ids.contains(&entry.mailbox_id)
|| has_active_claim_for_mailbox(
all_entries.iter(),
&entry.mailbox_id,
now,
Some(&entry.entry_id),
)
{
continue;
}
if let Some(ts) = self.load_mailbox_state_inner(&entry.mailbox_id).await? {
if entry.generation < ts.current_generation {
entry.status = tirea_contract::MailboxEntryStatus::Superseded;
entry.last_error =
Some("superseded by interrupt (reconciled on claim)".to_string());
entry.claim_token = None;
entry.claimed_by = None;
entry.lease_until = None;
entry.updated_at = now;
self.save_mailbox_entry(&entry).await?;
continue;
}
}
let mid = entry.mailbox_id.clone();
entry.status = tirea_contract::MailboxEntryStatus::Claimed;
entry.claim_token = Some(uuid::Uuid::now_v7().simple().to_string());
entry.claimed_by = Some(consumer_id.to_string());
entry.lease_until = Some(now.saturating_add(lease_duration_ms));
entry.attempt_count = entry.attempt_count.saturating_add(1);
entry.updated_at = now;
self.save_mailbox_entry(&entry).await?;
claimed.push(entry);
claimed_mailbox_ids.insert(mid);
}
Ok(claimed)
}
async fn claim_mailbox_entry(
&self,
entry_id: &str,
consumer_id: &str,
now: u64,
lease_duration_ms: u64,
) -> Result<Option<MailboxEntry>, MailboxStoreError> {
let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
return Ok(None);
};
if entry.status.is_terminal() {
return Ok(None);
}
if entry.status == tirea_contract::MailboxEntryStatus::Claimed
&& entry.lease_until.is_some_and(|lease| lease > now)
{
return Ok(None);
}
let all_entries = self.load_all_mailbox_entries().await?;
if has_active_claim_for_mailbox(all_entries.iter(), &entry.mailbox_id, now, Some(entry_id))
{
return Ok(None);
}
entry.status = tirea_contract::MailboxEntryStatus::Claimed;
entry.claim_token = Some(uuid::Uuid::now_v7().simple().to_string());
entry.claimed_by = Some(consumer_id.to_string());
entry.lease_until = Some(now.saturating_add(lease_duration_ms));
entry.attempt_count = entry.attempt_count.saturating_add(1);
entry.updated_at = now;
self.save_mailbox_entry(&entry).await?;
Ok(Some(entry))
}
async fn ack_mailbox_entry(
&self,
entry_id: &str,
claim_token: &str,
now: u64,
) -> Result<(), MailboxStoreError> {
let mut entry = self
.load_mailbox_entry(entry_id)
.await?
.ok_or_else(|| MailboxStoreError::NotFound(entry_id.to_string()))?;
if entry.claim_token.as_deref() != Some(claim_token) {
return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
}
entry.status = tirea_contract::MailboxEntryStatus::Accepted;
entry.claim_token = None;
entry.claimed_by = None;
entry.lease_until = None;
entry.updated_at = now;
self.save_mailbox_entry(&entry).await
}
async fn nack_mailbox_entry(
&self,
entry_id: &str,
claim_token: &str,
retry_at: u64,
error: &str,
now: u64,
) -> Result<(), MailboxStoreError> {
let mut entry = self
.load_mailbox_entry(entry_id)
.await?
.ok_or_else(|| MailboxStoreError::NotFound(entry_id.to_string()))?;
if entry.claim_token.as_deref() != Some(claim_token) {
return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
}
entry.status = tirea_contract::MailboxEntryStatus::Queued;
entry.available_at = retry_at;
entry.last_error = Some(error.to_string());
entry.claim_token = None;
entry.claimed_by = None;
entry.lease_until = None;
entry.updated_at = now;
self.save_mailbox_entry(&entry).await
}
async fn dead_letter_mailbox_entry(
&self,
entry_id: &str,
claim_token: &str,
error: &str,
now: u64,
) -> Result<(), MailboxStoreError> {
let mut entry = self
.load_mailbox_entry(entry_id)
.await?
.ok_or_else(|| MailboxStoreError::NotFound(entry_id.to_string()))?;
if entry.claim_token.as_deref() != Some(claim_token) {
return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
}
entry.status = tirea_contract::MailboxEntryStatus::DeadLetter;
entry.last_error = Some(error.to_string());
entry.claim_token = None;
entry.claimed_by = None;
entry.lease_until = None;
entry.updated_at = now;
self.save_mailbox_entry(&entry).await
}
async fn cancel_mailbox_entry(
&self,
entry_id: &str,
now: u64,
) -> Result<Option<MailboxEntry>, MailboxStoreError> {
let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
return Ok(None);
};
if entry.status.is_terminal() {
return Ok(Some(entry));
}
entry.status = tirea_contract::MailboxEntryStatus::Cancelled;
entry.last_error = Some("cancelled".to_string());
entry.claim_token = None;
entry.claimed_by = None;
entry.lease_until = None;
entry.updated_at = now;
self.save_mailbox_entry(&entry).await?;
Ok(Some(entry))
}
async fn supersede_mailbox_entry(
&self,
entry_id: &str,
now: u64,
reason: &str,
) -> Result<Option<MailboxEntry>, MailboxStoreError> {
let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
return Ok(None);
};
if entry.status.is_terminal() {
return Ok(Some(entry));
}
entry.status = tirea_contract::MailboxEntryStatus::Superseded;
entry.last_error = Some(reason.to_string());
entry.claim_token = None;
entry.claimed_by = None;
entry.lease_until = None;
entry.updated_at = now;
self.save_mailbox_entry(&entry).await?;
Ok(Some(entry))
}
async fn cancel_pending_for_mailbox(
&self,
mailbox_id: &str,
now: u64,
exclude_entry_id: Option<&str>,
) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
let entries = self.load_all_mailbox_entries().await?;
let mut cancelled = Vec::new();
for mut entry in entries {
if entry.mailbox_id != mailbox_id || entry.status.is_terminal() {
continue;
}
if exclude_entry_id.is_some_and(|eid| entry.entry_id == eid) {
continue;
}
entry.status = tirea_contract::MailboxEntryStatus::Cancelled;
entry.last_error = Some("cancelled".to_string());
entry.claim_token = None;
entry.claimed_by = None;
entry.lease_until = None;
entry.updated_at = now;
self.save_mailbox_entry(&entry).await?;
cancelled.push(entry);
}
Ok(cancelled)
}
async fn interrupt_mailbox(
&self,
mailbox_id: &str,
now: u64,
) -> Result<MailboxInterrupt, MailboxStoreError> {
let mut state = self
.load_mailbox_state_inner(mailbox_id)
.await?
.unwrap_or(MailboxState {
mailbox_id: mailbox_id.to_string(),
current_generation: 0,
updated_at: now,
});
state.current_generation = state.current_generation.saturating_add(1);
state.updated_at = now;
self.save_mailbox_state(&state).await?;
let entries = self.load_all_mailbox_entries().await?;
let mut superseded = Vec::new();
for mut entry in entries {
if entry.mailbox_id != mailbox_id || entry.status.is_terminal() {
continue;
}
if entry.generation >= state.current_generation {
continue;
}
entry.status = tirea_contract::MailboxEntryStatus::Superseded;
entry.last_error = Some("superseded by interrupt".to_string());
entry.claim_token = None;
entry.claimed_by = None;
entry.lease_until = None;
entry.updated_at = now;
self.save_mailbox_entry(&entry).await?;
superseded.push(entry);
}
Ok(MailboxInterrupt {
mailbox_state: state,
superseded_entries: superseded,
})
}
async fn extend_lease(
&self,
entry_id: &str,
claim_token: &str,
extension_ms: u64,
now: u64,
) -> Result<bool, MailboxStoreError> {
let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
return Ok(false);
};
if entry.status != tirea_contract::MailboxEntryStatus::Claimed {
return Ok(false);
}
if entry.claim_token.as_deref() != Some(claim_token) {
return Ok(false);
}
entry.lease_until = Some(now.saturating_add(extension_ms));
entry.updated_at = now;
self.save_mailbox_entry(&entry).await?;
Ok(true)
}
async fn purge_terminal_mailbox_entries(
&self,
older_than: u64,
) -> Result<usize, MailboxStoreError> {
let entries = self.load_all_mailbox_entries().await?;
let mut count = 0usize;
for entry in entries {
if entry.status.is_terminal() && entry.updated_at < older_than {
let path = self.mailbox_dir().join(format!("{}.json", entry.entry_id));
if path.exists() {
tokio::fs::remove_file(&path)
.await
.map_err(MailboxStoreError::Io)?;
count += 1;
}
}
}
Ok(count)
}
}
#[async_trait]
impl ThreadWriter for FileStore {
async fn create(&self, thread: &Thread) -> Result<Committed, ThreadStoreError> {
let path = self.thread_path(&thread.id)?;
if path.exists() {
return Err(ThreadStoreError::AlreadyExists);
}
let mut thread = thread.clone();
let now = now_millis();
if thread.metadata.created_at.is_none() {
thread.metadata.created_at = Some(now);
}
thread.metadata.updated_at = Some(now);
let head = ThreadHead { thread, version: 0 };
self.save_head(&head).await?;
Ok(Committed { version: 0 })
}
async fn append(
&self,
thread_id: &str,
delta: &ThreadChangeSet,
precondition: VersionPrecondition,
) -> Result<Committed, ThreadStoreError> {
let head = self
.load_head(thread_id)
.await?
.ok_or_else(|| ThreadStoreError::NotFound(thread_id.to_string()))?;
if let VersionPrecondition::Exact(expected) = precondition {
if head.version != expected {
return Err(ThreadStoreError::VersionConflict {
expected,
actual: head.version,
});
}
}
let mut thread = head.thread;
delta.apply_to(&mut thread);
thread.metadata.updated_at = Some(now_millis());
let new_version = head.version + 1;
let new_head = ThreadHead {
thread,
version: new_version,
};
self.save_head(&new_head).await?;
self.upsert_run_from_changeset(thread_id, delta).await?;
Ok(Committed {
version: new_version,
})
}
async fn delete(&self, thread_id: &str) -> Result<(), ThreadStoreError> {
let path = self.thread_path(thread_id)?;
if path.exists() {
tokio::fs::remove_file(&path).await?;
}
Ok(())
}
async fn save(&self, thread: &Thread) -> Result<(), ThreadStoreError> {
let next_version = self
.load_head(&thread.id)
.await?
.map_or(0, |head| head.version.saturating_add(1));
let mut thread = thread.clone();
let now = now_millis();
thread.metadata.updated_at = Some(now);
if thread.metadata.created_at.is_none() {
thread.metadata.created_at = Some(now);
}
let head = ThreadHead {
thread,
version: next_version,
};
self.save_head(&head).await
}
}
#[async_trait]
impl ThreadReader for FileStore {
async fn load(&self, thread_id: &str) -> Result<Option<ThreadHead>, ThreadStoreError> {
self.load_head(thread_id).await
}
async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, ThreadStoreError> {
self.load_run_record(run_id).await
}
async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, ThreadStoreError> {
let records = self.load_all_run_records().await?;
Ok(paginate_runs_in_memory(&records, query))
}
async fn active_run_for_thread(
&self,
thread_id: &str,
) -> Result<Option<RunRecord>, ThreadStoreError> {
let records = self.load_all_run_records().await?;
Ok(records
.into_iter()
.filter(|r| r.thread_id == thread_id && !r.status.is_terminal())
.max_by(|a, b| {
a.created_at
.cmp(&b.created_at)
.then_with(|| a.updated_at.cmp(&b.updated_at))
.then_with(|| a.run_id.cmp(&b.run_id))
}))
}
async fn list_threads(
&self,
query: &ThreadListQuery,
) -> Result<ThreadListPage, ThreadStoreError> {
let mut all = file_utils::scan_json_stems(&self.base_path).await?;
if let Some(ref resource_id) = query.resource_id {
let mut filtered = Vec::new();
for id in &all {
if let Some(head) = self.load(id).await? {
if head.thread.resource_id.as_deref() == Some(resource_id.as_str()) {
filtered.push(id.clone());
}
}
}
all = filtered;
}
if let Some(ref parent_thread_id) = query.parent_thread_id {
let mut filtered = Vec::new();
for id in &all {
if let Some(head) = self.load(id).await? {
if head.thread.parent_thread_id.as_deref() == Some(parent_thread_id.as_str()) {
filtered.push(id.clone());
}
}
}
all = filtered;
}
all.sort();
let total = all.len();
let limit = query.limit.clamp(1, 200);
let offset = query.offset.min(total);
let end = (offset + limit + 1).min(total);
let slice = &all[offset..end];
let has_more = slice.len() > limit;
let items: Vec<String> = slice.iter().take(limit).cloned().collect();
Ok(ThreadListPage {
items,
total,
has_more,
})
}
}
impl FileStore {
fn runs_dir(&self) -> PathBuf {
self.base_path.join("_runs")
}
fn run_path(&self, run_id: &str) -> Result<PathBuf, ThreadStoreError> {
file_utils::validate_fs_id(run_id, "run id").map_err(ThreadStoreError::InvalidId)?;
Ok(self.runs_dir().join(format!("{run_id}.json")))
}
async fn save_run_record(&self, record: &RunRecord) -> Result<(), ThreadStoreError> {
let payload = serde_json::to_string_pretty(record)
.map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
let filename = format!("{}.json", record.run_id);
file_utils::atomic_json_write(&self.runs_dir(), &filename, &payload)
.await
.map_err(ThreadStoreError::Io)
}
async fn load_run_record(&self, run_id: &str) -> Result<Option<RunRecord>, ThreadStoreError> {
let path = self.run_path(run_id)?;
if !path.exists() {
return Ok(None);
}
let content = tokio::fs::read_to_string(path).await?;
let record: RunRecord = serde_json::from_str(&content)
.map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
Ok(Some(record))
}
async fn load_all_run_records(&self) -> Result<Vec<RunRecord>, ThreadStoreError> {
let dir = self.runs_dir();
if !dir.exists() {
return Ok(Vec::new());
}
let mut entries = tokio::fs::read_dir(&dir).await?;
let mut records = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().is_none_or(|ext| ext != "json") {
continue;
}
let content = tokio::fs::read_to_string(path).await?;
let record: RunRecord = serde_json::from_str(&content)
.map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
records.push(record);
}
Ok(records)
}
async fn upsert_run_from_changeset(
&self,
thread_id: &str,
delta: &ThreadChangeSet,
) -> Result<(), ThreadStoreError> {
if delta.run_id.is_empty() {
return Ok(());
}
let now = now_millis();
if let Some(meta) = &delta.run_meta {
let mut record = self
.load_run_record(&delta.run_id)
.await?
.unwrap_or_else(|| {
RunRecord::new(
&delta.run_id,
thread_id,
&meta.agent_id,
meta.origin,
meta.status,
now,
)
});
record.input_tokens = meta.input_tokens;
record.output_tokens = meta.output_tokens;
record.status = meta.status;
record.agent_id.clone_from(&meta.agent_id);
record.origin = meta.origin;
record.thread_id = thread_id.to_string();
if record.parent_run_id.is_none() {
record.parent_run_id.clone_from(&delta.parent_run_id);
}
if record.parent_thread_id.is_none() {
record.parent_thread_id.clone_from(&meta.parent_thread_id);
}
record.termination_code.clone_from(&meta.termination_code);
record
.termination_detail
.clone_from(&meta.termination_detail);
if record.source_mailbox_entry_id.is_none() {
record
.source_mailbox_entry_id
.clone_from(&meta.source_mailbox_entry_id);
}
record.updated_at = now;
self.save_run_record(&record).await?;
} else if let Some(mut record) = self.load_run_record(&delta.run_id).await? {
record.updated_at = now;
self.save_run_record(&record).await?;
}
Ok(())
}
async fn load_head(&self, thread_id: &str) -> Result<Option<ThreadHead>, ThreadStoreError> {
let path = self.thread_path(thread_id)?;
if !path.exists() {
return Ok(None);
}
let content = tokio::fs::read_to_string(&path).await?;
if let Ok(head) = serde_json::from_str::<VersionedThread>(&content) {
let thread: Thread = serde_json::from_str(&content)
.map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
Ok(Some(ThreadHead {
thread,
version: head._version.unwrap_or(0),
}))
} else {
let thread: Thread = serde_json::from_str(&content)
.map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
Ok(Some(ThreadHead { thread, version: 0 }))
}
}
async fn save_head(&self, head: &ThreadHead) -> Result<(), ThreadStoreError> {
let mut v = serde_json::to_value(&head.thread)
.map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
if let Some(obj) = v.as_object_mut() {
obj.insert("_version".to_string(), serde_json::json!(head.version));
}
let content = serde_json::to_string_pretty(&v)
.map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
let filename = format!("{}.json", head.thread.id);
file_utils::atomic_json_write(&self.base_path, &filename, &content)
.await
.map_err(ThreadStoreError::Io)
}
}
#[derive(Deserialize)]
struct VersionedThread {
#[serde(default)]
_version: Option<Version>,
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::sync::Arc;
use tempfile::TempDir;
use tirea_contract::{
storage::{MailboxEntryStatus, MailboxReader, MailboxWriter, ThreadReader},
testing::MailboxEntryBuilder,
CheckpointReason, Message, MessageQuery, ThreadWriter,
};
use tirea_state::{path, Op, Patch, TrackedPatch};
fn make_thread_with_messages(thread_id: &str, n: usize) -> Thread {
let mut thread = Thread::new(thread_id);
for i in 0..n {
thread = thread.with_message(Message::user(format!("msg-{i}")));
}
thread
}
#[tokio::test]
async fn file_storage_save_load_roundtrip() {
let temp_dir = TempDir::new().unwrap();
let storage = FileStore::new(temp_dir.path());
let thread = Thread::new("test-1").with_message(Message::user("hello"));
storage.save(&thread).await.unwrap();
let loaded = storage.load_thread("test-1").await.unwrap().unwrap();
assert_eq!(loaded.id, "test-1");
assert_eq!(loaded.message_count(), 1);
}
#[tokio::test]
async fn file_storage_list_and_delete() {
let temp_dir = TempDir::new().unwrap();
let storage = FileStore::new(temp_dir.path());
storage.create(&Thread::new("thread-a")).await.unwrap();
storage.create(&Thread::new("thread-b")).await.unwrap();
storage.create(&Thread::new("thread-c")).await.unwrap();
let mut ids = storage.list().await.unwrap();
ids.sort();
assert_eq!(ids, vec!["thread-a", "thread-b", "thread-c"]);
storage.delete("thread-b").await.unwrap();
let mut ids = storage.list().await.unwrap();
ids.sort();
assert_eq!(ids, vec!["thread-a", "thread-c"]);
}
#[tokio::test]
async fn file_storage_message_queries() {
let temp_dir = TempDir::new().unwrap();
let storage = FileStore::new(temp_dir.path());
let thread = make_thread_with_messages("t1", 10);
storage.save(&thread).await.unwrap();
let page = storage
.load_messages(
"t1",
&MessageQuery {
after: Some(4),
limit: 3,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(page.messages.len(), 3);
assert_eq!(page.messages[0].cursor, 5);
assert_eq!(page.messages[0].message.content, "msg-5");
assert_eq!(storage.message_count("t1").await.unwrap(), 10);
}
#[tokio::test]
async fn file_storage_append_and_versioning() {
let temp_dir = TempDir::new().unwrap();
let store = FileStore::new(temp_dir.path());
store.create(&Thread::new("t1")).await.unwrap();
let d1 = ThreadChangeSet {
run_id: "run-1".to_string(),
parent_run_id: None,
run_meta: None,
reason: CheckpointReason::UserMessage,
messages: vec![Arc::new(Message::user("hello"))],
patches: vec![],
state_actions: vec![],
snapshot: None,
};
let c1 = store
.append("t1", &d1, VersionPrecondition::Exact(0))
.await
.unwrap();
assert_eq!(c1.version, 1);
let d2 = ThreadChangeSet {
run_id: "run-1".to_string(),
parent_run_id: None,
run_meta: None,
reason: CheckpointReason::AssistantTurnCommitted,
messages: vec![Arc::new(Message::assistant("hi"))],
patches: vec![TrackedPatch::new(
Patch::new().with_op(Op::set(path!("greeted"), json!(true))),
)],
state_actions: vec![],
snapshot: None,
};
let c2 = store
.append("t1", &d2, VersionPrecondition::Exact(1))
.await
.unwrap();
assert_eq!(c2.version, 2);
let d3 = ThreadChangeSet {
run_id: "run-1".to_string(),
parent_run_id: None,
run_meta: None,
reason: CheckpointReason::RunFinished,
messages: vec![],
patches: vec![],
state_actions: vec![],
snapshot: Some(json!({"greeted": true})),
};
let c3 = store
.append("t1", &d3, VersionPrecondition::Exact(2))
.await
.unwrap();
assert_eq!(c3.version, 3);
let store2 = FileStore::new(temp_dir.path());
let head = store2.load("t1").await.unwrap().unwrap();
assert_eq!(head.version, 3);
assert_eq!(head.thread.message_count(), 2);
assert!(head.thread.patches.is_empty());
assert_eq!(head.thread.state, json!({"greeted": true}));
}
#[tokio::test]
async fn file_storage_tool_call_message_roundtrip() {
let temp_dir = TempDir::new().unwrap();
let storage = FileStore::new(temp_dir.path());
let tool_call = tirea_contract::ToolCall::new("call_1", "search", json!({"query": "rust"}));
let thread = Thread::new("tool-rt")
.with_message(Message::user("Find info about Rust"))
.with_message(Message::assistant_with_tool_calls(
"Let me search for that.",
vec![tool_call],
))
.with_message(Message::tool(
"call_1",
r#"{"result": "Rust is a language"}"#,
))
.with_message(Message::assistant(
"Rust is a systems programming language.",
));
storage.save(&thread).await.unwrap();
let loaded = storage.load_thread("tool-rt").await.unwrap().unwrap();
assert_eq!(loaded.message_count(), 4);
let assistant_msg = &loaded.messages[1];
assert_eq!(assistant_msg.role, tirea_contract::Role::Assistant);
let calls = assistant_msg.tool_calls.as_ref().expect("tool_calls lost");
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].id, "call_1");
assert_eq!(calls[0].name, "search");
assert_eq!(calls[0].arguments, json!({"query": "rust"}));
let tool_msg = &loaded.messages[2];
assert_eq!(tool_msg.role, tirea_contract::Role::Tool);
assert_eq!(tool_msg.tool_call_id.as_deref(), Some("call_1"));
assert_eq!(tool_msg.content, r#"{"result": "Rust is a language"}"#);
}
#[tokio::test]
async fn file_storage_tool_call_message_roundtrip_via_append() {
let temp_dir = TempDir::new().unwrap();
let store = FileStore::new(temp_dir.path());
store.create(&Thread::new("tool-append")).await.unwrap();
let tool_call =
tirea_contract::ToolCall::new("call_42", "calculator", json!({"expr": "6*7"}));
let delta = ThreadChangeSet {
run_id: "run-1".to_string(),
parent_run_id: None,
run_meta: None,
reason: CheckpointReason::AssistantTurnCommitted,
messages: vec![
Arc::new(Message::assistant_with_tool_calls(
"Calculating...",
vec![tool_call],
)),
Arc::new(Message::tool("call_42", r#"{"answer": 42}"#)),
],
patches: vec![],
state_actions: vec![],
snapshot: None,
};
store
.append("tool-append", &delta, VersionPrecondition::Exact(0))
.await
.unwrap();
let head = store.load("tool-append").await.unwrap().unwrap();
assert_eq!(head.thread.message_count(), 2);
let calls = head.thread.messages[0]
.tool_calls
.as_ref()
.expect("tool_calls lost after append");
assert_eq!(calls[0].id, "call_42");
assert_eq!(calls[0].name, "calculator");
assert_eq!(
head.thread.messages[1].tool_call_id.as_deref(),
Some("call_42")
);
}
#[tokio::test]
async fn file_storage_timestamps_populated() {
let temp_dir = TempDir::new().unwrap();
let store = FileStore::new(temp_dir.path());
store.create(&Thread::new("ts-1")).await.unwrap();
let head = store.load("ts-1").await.unwrap().unwrap();
assert!(head.thread.metadata.created_at.is_some());
assert!(head.thread.metadata.updated_at.is_some());
let created = head.thread.metadata.created_at.unwrap();
let updated = head.thread.metadata.updated_at.unwrap();
assert!(created > 0);
assert_eq!(created, updated);
let delta = ThreadChangeSet {
run_id: "run-1".to_string(),
parent_run_id: None,
run_meta: None,
reason: CheckpointReason::UserMessage,
messages: vec![Arc::new(Message::user("hello"))],
patches: vec![],
state_actions: vec![],
snapshot: None,
};
store
.append("ts-1", &delta, VersionPrecondition::Exact(0))
.await
.unwrap();
let head = store.load("ts-1").await.unwrap().unwrap();
assert!(head.thread.metadata.updated_at.unwrap() >= updated);
assert_eq!(head.thread.metadata.created_at.unwrap(), created);
let thread = Thread::new("ts-2");
assert!(thread.metadata.created_at.is_none());
store.save(&thread).await.unwrap();
let head = store.load("ts-2").await.unwrap().unwrap();
assert!(head.thread.metadata.created_at.is_some());
assert!(head.thread.metadata.updated_at.is_some());
}
#[test]
fn file_storage_rejects_path_traversal() {
let storage = FileStore::new("/base/path");
assert!(storage.thread_path("../../etc/passwd").is_err());
assert!(storage.thread_path("foo/bar").is_err());
assert!(storage.thread_path("foo\\bar").is_err());
assert!(storage.thread_path("").is_err());
assert!(storage.thread_path("foo\0bar").is_err());
}
#[tokio::test]
async fn file_storage_mailbox_claim_and_cancel_roundtrip() {
let temp_dir = TempDir::new().unwrap();
let storage = FileStore::new(temp_dir.path());
let entry = MailboxEntryBuilder::queued("entry-file-mailbox", "mailbox-file-mailbox")
.with_payload(json!({"message": "hello"}))
.build();
storage.enqueue_mailbox_entry(&entry).await.unwrap();
let claimed = storage
.claim_mailbox_entries(None, 1, "worker-file", 10, 5_000)
.await
.unwrap();
assert_eq!(claimed.len(), 1);
assert_eq!(claimed[0].status, MailboxEntryStatus::Claimed);
let cancelled = storage
.cancel_mailbox_entry("entry-file-mailbox", 20)
.await
.unwrap()
.expect("queued entry should be cancellable");
assert_eq!(cancelled.status, MailboxEntryStatus::Cancelled);
let loaded = storage
.load_mailbox_entry(&entry.entry_id)
.await
.unwrap()
.expect("mailbox entry should persist");
assert_eq!(loaded.status, MailboxEntryStatus::Cancelled);
}
#[tokio::test]
async fn file_storage_mailbox_claim_by_entry_id_ignores_available_at() {
let temp_dir = TempDir::new().unwrap();
let storage = FileStore::new(temp_dir.path());
let entry = MailboxEntryBuilder::queued("entry-file-inline", "mailbox-file-inline")
.with_payload(json!({"message": "hello"}))
.with_available_at(i64::MAX as u64)
.build();
storage.enqueue_mailbox_entry(&entry).await.unwrap();
let claimed = storage
.claim_mailbox_entries(None, 1, "worker-file-batch", 10, 5_000)
.await
.unwrap();
assert!(claimed.is_empty());
let targeted = storage
.claim_mailbox_entry("entry-file-inline", "worker-file-inline", 10, 5_000)
.await
.unwrap()
.expect("inline claim should succeed");
assert_eq!(targeted.status, MailboxEntryStatus::Claimed);
assert_eq!(targeted.claimed_by.as_deref(), Some("worker-file-inline"));
}
#[tokio::test]
async fn file_storage_mailbox_interrupt_bumps_generation_and_supersedes_entries() {
let temp_dir = TempDir::new().unwrap();
let storage = FileStore::new(temp_dir.path());
let old_a = MailboxEntryBuilder::queued("entry-file-old-a", "mailbox-file-interrupt")
.with_payload(json!({"message": "hello"}))
.build();
let old_b = MailboxEntryBuilder::queued("entry-file-old-b", "mailbox-file-interrupt")
.with_payload(json!({"message": "hello"}))
.build();
storage.enqueue_mailbox_entry(&old_a).await.unwrap();
storage.enqueue_mailbox_entry(&old_b).await.unwrap();
let interrupted = storage
.interrupt_mailbox("mailbox-file-interrupt", 50)
.await
.unwrap();
assert_eq!(interrupted.mailbox_state.current_generation, 1);
assert_eq!(interrupted.superseded_entries.len(), 2);
let superseded = storage
.load_mailbox_entry("entry-file-old-a")
.await
.unwrap()
.expect("superseded entry should exist");
assert_eq!(superseded.status, MailboxEntryStatus::Superseded);
let next_generation = storage
.ensure_mailbox_state("mailbox-file-interrupt", 60)
.await
.unwrap()
.current_generation;
let fresh = MailboxEntryBuilder::queued("entry-file-fresh", "mailbox-file-interrupt")
.with_payload(json!({"message": "hello"}))
.with_generation(next_generation)
.build();
storage.enqueue_mailbox_entry(&fresh).await.unwrap();
let fresh_loaded = storage
.load_mailbox_entry("entry-file-fresh")
.await
.unwrap()
.expect("fresh entry should exist");
assert_eq!(fresh_loaded.generation, 1);
assert_eq!(fresh_loaded.status, MailboxEntryStatus::Queued);
}
#[tokio::test]
async fn file_storage_mailbox_rejects_duplicate_dedupe_key_in_same_mailbox() {
let temp_dir = TempDir::new().unwrap();
let storage = FileStore::new(temp_dir.path());
let first = MailboxEntryBuilder::queued("entry-file-dedupe-1", "mailbox-file-dedupe")
.with_dedupe_key("dup-key")
.build();
let duplicate = MailboxEntryBuilder::queued("entry-file-dedupe-2", "mailbox-file-dedupe")
.with_dedupe_key("dup-key")
.build();
storage.enqueue_mailbox_entry(&first).await.unwrap();
let result = storage.enqueue_mailbox_entry(&duplicate).await;
assert!(matches!(result, Err(MailboxStoreError::AlreadyExists(_))));
}
}