use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use fjall::{Database, Keyspace, KeyspaceCreateOptions};
use fs2::FileExt;
use thiserror::Error;
use super::COMMS_SCHEMA_VER;
use super::ids::{AgentId, RoomId};
use super::keys;
use super::model::{AgentRecord, MessageBody, MessageMeta, Room, Subscription, now_micros};
const META_SCHEMA_VER: &[u8] = b"schema_ver";
const STORE_DIR: &str = "store.fjall";
const LOCK_FILE: &str = ".lock";
const LOCK_ATTEMPTS: u32 = 25;
const LOCK_BACKOFF: std::time::Duration = std::time::Duration::from_millis(20);
#[derive(Debug, Error)]
pub enum CommsStoreError {
#[error("fjall error: {0}")]
Fjall(#[from] fjall::Error),
#[error("io error on {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("msgpack encode error: {0}")]
Encode(#[from] rmp_serde::encode::Error),
#[error("msgpack decode error: {0}")]
Decode(#[from] rmp_serde::decode::Error),
#[error("another basemind comms daemon holds the lock on {0}")]
Locked(PathBuf),
}
pub struct CommsStore {
db: Database,
meta: Keyspace,
rooms: Keyspace,
messages_by_room: Keyspace,
message_body: Keyspace,
subs_by_room: Keyspace,
cursors: Keyspace,
agents: Keyspace,
_lock: File,
}
impl CommsStore {
pub fn open(comms_dir: &Path) -> Result<Self, CommsStoreError> {
std::fs::create_dir_all(comms_dir).map_err(|source| CommsStoreError::Io {
path: comms_dir.to_path_buf(),
source,
})?;
let lock = acquire_lock(comms_dir)?;
let dir = comms_dir.join(STORE_DIR);
let needs_wipe = match peek_schema_version(&dir) {
Some(ver) if ver == COMMS_SCHEMA_VER => false,
None => false, Some(_) => true,
};
if needs_wipe && dir.exists() {
std::fs::remove_dir_all(&dir).map_err(|source| CommsStoreError::Io {
path: dir.clone(),
source,
})?;
}
std::fs::create_dir_all(&dir).map_err(|source| CommsStoreError::Io {
path: dir.clone(),
source,
})?;
let db = Database::builder(&dir).open()?;
let meta = db.keyspace("meta", KeyspaceCreateOptions::default)?;
let rooms = db.keyspace("rooms", KeyspaceCreateOptions::default)?;
let messages_by_room = db.keyspace("messages_by_room", KeyspaceCreateOptions::default)?;
let message_body = db.keyspace("message_body", KeyspaceCreateOptions::default)?;
let subs_by_room = db.keyspace("subs_by_room", KeyspaceCreateOptions::default)?;
let cursors = db.keyspace("cursors", KeyspaceCreateOptions::default)?;
let agents = db.keyspace("agents", KeyspaceCreateOptions::default)?;
meta.insert(META_SCHEMA_VER, COMMS_SCHEMA_VER.to_be_bytes())?;
Ok(Self {
db,
meta,
rooms,
messages_by_room,
message_body,
subs_by_room,
cursors,
agents,
_lock: lock,
})
}
pub fn put_room(&self, room: &Room) -> Result<(), CommsStoreError> {
let bytes = rmp_serde::to_vec_named(room)?;
self.rooms
.insert(keys::room_key(room.room_id.as_str()), bytes)?;
Ok(())
}
pub fn get_room(&self, room: &RoomId) -> Result<Option<Room>, CommsStoreError> {
match self.rooms.get(keys::room_key(room.as_str()))? {
Some(v) => Ok(Some(rmp_serde::from_slice(&v)?)),
None => Ok(None),
}
}
pub fn list_rooms(&self) -> Result<Vec<Room>, CommsStoreError> {
let mut out = Vec::new();
for guard in self.rooms.iter() {
let (_, v) = guard.into_inner()?;
out.push(rmp_serde::from_slice(&v)?);
}
Ok(out)
}
pub fn put_agent(&self, agent: &AgentRecord) -> Result<(), CommsStoreError> {
let bytes = rmp_serde::to_vec_named(agent)?;
self.agents
.insert(keys::agent_key(agent.agent_id.as_str()), bytes)?;
Ok(())
}
pub fn get_agent(&self, agent: &AgentId) -> Result<Option<AgentRecord>, CommsStoreError> {
match self.agents.get(keys::agent_key(agent.as_str()))? {
Some(v) => Ok(Some(rmp_serde::from_slice(&v)?)),
None => Ok(None),
}
}
pub fn list_agents(&self) -> Result<Vec<AgentRecord>, CommsStoreError> {
let mut out = Vec::new();
for guard in self.agents.iter() {
let (_, v) = guard.into_inner()?;
out.push(rmp_serde::from_slice(&v)?);
}
Ok(out)
}
pub fn subscribe(&self, sub: &Subscription) -> Result<(), CommsStoreError> {
let key = keys::sub_by_room(sub.room.as_str(), sub.agent_id.as_str());
let bytes = rmp_serde::to_vec_named(sub)?;
self.subs_by_room.insert(key, bytes)?;
Ok(())
}
pub fn unsubscribe(&self, room: &RoomId, agent: &AgentId) -> Result<(), CommsStoreError> {
let key = keys::sub_by_room(room.as_str(), agent.as_str());
self.subs_by_room.remove(key)?;
Ok(())
}
pub fn subscribers(&self, room: &RoomId) -> Result<Vec<AgentId>, CommsStoreError> {
let prefix = keys::subs_by_room_prefix(room.as_str());
let mut out = Vec::new();
for guard in self.subs_by_room.prefix(prefix) {
let (k, _) = guard.into_inner()?;
if let Some((_, agent)) = keys::parse_sub_by_room(&k)
&& let Ok(id) = AgentId::parse(agent)
{
out.push(id);
}
}
Ok(out)
}
pub fn rooms_for_agent(&self, agent: &AgentId) -> Result<Vec<RoomId>, CommsStoreError> {
let mut out = Vec::new();
for guard in self.subs_by_room.iter() {
let (k, _) = guard.into_inner()?;
if let Some((room, a)) = keys::parse_sub_by_room(&k)
&& a == agent.as_str()
&& let Ok(id) = RoomId::parse(room)
{
out.push(id);
}
}
Ok(out)
}
fn current_seq(&self, room: &RoomId) -> Result<u64, CommsStoreError> {
let key = keys::room_seq_meta_key(room.as_str());
Ok(match self.meta.get(&key)? {
Some(v) if v.len() == 8 => {
u64::from_be_bytes([v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7]])
}
_ => 0,
})
}
pub fn post(
&self,
room: &RoomId,
meta: MessageMeta,
body: MessageBody,
) -> Result<(u64, MessageMeta), CommsStoreError> {
let seq = self.current_seq(room)?.saturating_add(1);
let mut batch = self.db.batch();
batch.insert(
&self.meta,
keys::room_seq_meta_key(room.as_str()),
seq.to_be_bytes(),
);
let meta_key = keys::message_by_room(room.as_str(), seq);
let meta_bytes = rmp_serde::to_vec_named(&meta)?;
batch.insert(&self.messages_by_room, meta_key, meta_bytes);
let body_bytes = rmp_serde::to_vec_named(&body)?;
batch.insert(&self.message_body, meta.id.as_bytes().to_vec(), body_bytes);
batch.commit()?;
Ok((seq, meta))
}
pub fn history(
&self,
room: &RoomId,
after_seq: u64,
limit: usize,
) -> Result<HistoryPage, CommsStoreError> {
let prefix = keys::messages_by_room_prefix(room.as_str());
let mut messages = Vec::new();
let mut last_seq = after_seq;
let mut more = false;
for guard in self.messages_by_room.prefix(&prefix) {
let (k, v) = guard.into_inner()?;
let Some((_, seq)) = keys::parse_message_by_room(&k) else {
continue;
};
if seq <= after_seq {
continue;
}
if messages.len() >= limit {
more = true;
break;
}
let meta: MessageMeta = rmp_serde::from_slice(&v)?;
messages.push((seq, meta));
last_seq = seq;
}
Ok(HistoryPage {
messages,
last_seq,
more,
})
}
pub fn history_with_seq(
&self,
room: &RoomId,
after_seq: u64,
limit: usize,
) -> Result<Vec<(u64, MessageMeta)>, CommsStoreError> {
let prefix = keys::messages_by_room_prefix(room.as_str());
let mut out = Vec::new();
for guard in self.messages_by_room.prefix(&prefix) {
let (k, v) = guard.into_inner()?;
let Some((_, seq)) = keys::parse_message_by_room(&k) else {
continue;
};
if seq <= after_seq {
continue;
}
if out.len() >= limit {
break;
}
out.push((seq, rmp_serde::from_slice(&v)?));
}
Ok(out)
}
pub fn get_body(&self, message_id: &str) -> Result<Option<Vec<u8>>, CommsStoreError> {
match self.message_body.get(message_id.as_bytes())? {
Some(v) => {
let body: MessageBody = rmp_serde::from_slice(&v)?;
Ok(Some(body.0))
}
None => Ok(None),
}
}
pub fn resolve_ids(
&self,
message_ids: &[String],
) -> Result<Vec<(String, RoomId, u64)>, CommsStoreError> {
if message_ids.is_empty() {
return Ok(Vec::new());
}
let wanted: ahash::AHashSet<&str> = message_ids.iter().map(String::as_str).collect();
let mut out = Vec::with_capacity(message_ids.len());
for guard in self.messages_by_room.iter() {
let (k, v) = guard.into_inner()?;
let Some((_, seq)) = keys::parse_message_by_room(&k) else {
continue;
};
let meta: MessageMeta = rmp_serde::from_slice(&v)?;
if wanted.contains(meta.id.as_str()) {
out.push((meta.id.clone(), meta.room, seq));
if out.len() == wanted.len() {
break;
}
}
}
Ok(out)
}
pub fn read_cursor(&self, agent: &AgentId, room: &RoomId) -> Result<u64, CommsStoreError> {
let key = keys::cursor_key(agent.as_str(), room.as_str());
match self.cursors.get(key)? {
Some(v) if v.len() == 8 => Ok(u64::from_be_bytes([
v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7],
])),
_ => Ok(0),
}
}
pub fn set_read_cursor(
&self,
agent: &AgentId,
room: &RoomId,
seq: u64,
) -> Result<(), CommsStoreError> {
let current = self.read_cursor(agent, room)?;
if seq <= current {
return Ok(());
}
let key = keys::cursor_key(agent.as_str(), room.as_str());
self.cursors.insert(key, seq.to_be_bytes())?;
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HistoryPage {
pub messages: Vec<(u64, MessageMeta)>,
pub last_seq: u64,
pub more: bool,
}
pub fn body_hash_hex(body: &[u8]) -> String {
crate::hashing::hex(&crate::hashing::hash_bytes(body))
}
#[allow(clippy::too_many_arguments)]
pub fn build_meta(
id: String,
room: RoomId,
from: AgentId,
subject: String,
tags: Vec<String>,
reply_to: Option<String>,
scope: Vec<String>,
body: &[u8],
) -> MessageMeta {
MessageMeta {
id,
room,
from,
ts_micros: now_micros(),
subject,
tags,
reply_to,
scope,
body_len: u32::try_from(body.len()).unwrap_or(u32::MAX),
body_sha: body_hash_hex(body),
}
}
fn acquire_lock(comms_dir: &Path) -> Result<File, CommsStoreError> {
let path = comms_dir.join(LOCK_FILE);
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&path)
.map_err(|source| CommsStoreError::Io {
path: path.clone(),
source,
})?;
for attempt in 0..LOCK_ATTEMPTS {
match file.try_lock_exclusive() {
Ok(()) => return Ok(file),
Err(_) if attempt + 1 < LOCK_ATTEMPTS => std::thread::sleep(LOCK_BACKOFF),
Err(_) => return Err(CommsStoreError::Locked(path)),
}
}
unreachable!("loop returns on the final attempt")
}
fn peek_schema_version(dir: &Path) -> Option<u32> {
if !dir.exists() {
return None;
}
let db = Database::builder(dir).open().ok()?;
let meta = db.keyspace("meta", KeyspaceCreateOptions::default).ok()?;
let bytes = meta.get(META_SCHEMA_VER).ok().flatten()?;
if bytes.len() != 4 {
return None;
}
Some(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::comms::model::AgentCard;
fn temp_store() -> (tempfile::TempDir, CommsStore) {
let dir = tempfile::tempdir().expect("tempdir");
let store = CommsStore::open(dir.path()).expect("open store");
(dir, store)
}
fn room_id(s: &str) -> RoomId {
RoomId::parse(s).expect("room")
}
fn agent_id(s: &str) -> AgentId {
AgentId::parse(s).expect("agent")
}
#[test]
fn post_then_history_returns_meta_and_body_is_not_loaded() {
let (_d, store) = temp_store();
let room = room_id("room-1");
store
.put_room(&Room {
room_id: room.clone(),
scope: super::super::model::RoomScope::Global,
title: "t".to_string(),
created_at: now_micros(),
})
.expect("put room");
let body = b"the quick brown fox".to_vec();
let meta = build_meta(
"m-1".to_string(),
room.clone(),
agent_id("agent-1"),
"subj".to_string(),
vec![],
None,
vec![],
&body,
);
let (seq, _) = store
.post(&room, meta.clone(), MessageBody(body.clone()))
.expect("post");
assert_eq!(seq, 1, "first message in a room gets seq 1");
let page = store.history(&room, 0, 10).expect("history");
assert_eq!(page.messages.len(), 1);
let (got_seq, got) = &page.messages[0];
assert_eq!(
*got_seq, 1,
"history pairs each record with its per-room seq"
);
assert_eq!(got.id, "m-1");
assert_eq!(got.subject, "subj");
assert_eq!(got.body_len as usize, body.len());
assert_eq!(got.body_sha, body_hash_hex(&body));
let fetched = store.get_body("m-1").expect("get_body");
assert_eq!(fetched.as_deref(), Some(body.as_slice()));
assert_eq!(store.get_body("nope").expect("get_body"), None);
}
#[test]
fn history_paginates_by_seq() {
let (_d, store) = temp_store();
let room = room_id("room-1");
for i in 0..5u32 {
let body = format!("body-{i}").into_bytes();
let meta = build_meta(
format!("m-{i}"),
room.clone(),
agent_id("a"),
format!("s-{i}"),
vec![],
None,
vec![],
&body,
);
store.post(&room, meta, MessageBody(body)).expect("post");
}
let page1 = store.history(&room, 0, 2).expect("history");
assert_eq!(page1.messages.len(), 2);
assert!(page1.more);
let page2 = store.history(&room, page1.last_seq, 2).expect("history");
assert_eq!(page2.messages.len(), 2);
assert_eq!(page2.messages[0].1.id, "m-2");
}
#[test]
fn seq_counter_persists_across_reopen() {
let dir = tempfile::tempdir().expect("tempdir");
let room = room_id("room-1");
let post = |store: &CommsStore, id: &str| {
let body = id.as_bytes().to_vec();
let meta = build_meta(
id.to_string(),
room.clone(),
agent_id("a"),
id.to_string(),
vec![],
None,
vec![],
&body,
);
store.post(&room, meta, MessageBody(body)).expect("post").0
};
{
let store = CommsStore::open(dir.path()).expect("open");
assert_eq!(post(&store, "m-1"), 1);
assert_eq!(post(&store, "m-2"), 2);
}
{
let store = CommsStore::open(dir.path()).expect("reopen");
assert_eq!(post(&store, "m-3"), 3, "seq must continue past reopen");
let page = store.history(&room, 0, 10).expect("history");
assert_eq!(page.messages.len(), 3, "no message lost or overwritten");
let ids: Vec<&str> = page.messages.iter().map(|(_, m)| m.id.as_str()).collect();
assert_eq!(ids, ["m-1", "m-2", "m-3"]);
}
}
#[test]
fn subscriptions_round_trip() {
let (_d, store) = temp_store();
let room = room_id("room-1");
let agent = agent_id("agent-1");
store
.subscribe(&Subscription {
agent_id: agent.clone(),
room: room.clone(),
created_at: now_micros(),
})
.expect("subscribe");
assert_eq!(store.subscribers(&room).expect("subs"), vec![agent.clone()]);
assert_eq!(
store.rooms_for_agent(&agent).expect("rooms"),
vec![room.clone()]
);
store.unsubscribe(&room, &agent).expect("unsub");
assert!(store.subscribers(&room).expect("subs").is_empty());
}
#[test]
fn read_cursor_is_monotonic() {
let (_d, store) = temp_store();
let room = room_id("room-1");
let agent = agent_id("agent-1");
assert_eq!(store.read_cursor(&agent, &room).expect("read"), 0);
store.set_read_cursor(&agent, &room, 5).expect("set");
assert_eq!(store.read_cursor(&agent, &room).expect("read"), 5);
store.set_read_cursor(&agent, &room, 3).expect("set");
assert_eq!(store.read_cursor(&agent, &room).expect("read"), 5);
}
#[test]
fn resolve_ids_maps_each_id_to_its_room_and_seq() {
let (_d, store) = temp_store();
let room_a = room_id("room-a");
let room_b = room_id("room-b");
let mk = |store: &CommsStore, room: &RoomId, id: &str| {
let body = id.as_bytes().to_vec();
let meta = build_meta(
id.to_string(),
room.clone(),
agent_id("a"),
id.to_string(),
vec![],
None,
vec![],
&body,
);
store.post(room, meta, MessageBody(body)).expect("post").0
};
let s_a1 = mk(&store, &room_a, "m-a1");
let _s_a2 = mk(&store, &room_a, "m-a2");
let s_b1 = mk(&store, &room_b, "m-b1");
let mut got = store
.resolve_ids(&["m-a1".to_string(), "m-b1".to_string(), "ghost".to_string()])
.expect("resolve_ids");
got.sort_by(|x, y| x.0.cmp(&y.0));
assert_eq!(
got,
vec![
("m-a1".to_string(), room_a.clone(), s_a1),
("m-b1".to_string(), room_b.clone(), s_b1),
]
);
assert!(store.resolve_ids(&[]).expect("resolve_ids").is_empty());
}
#[test]
fn agent_records_round_trip() {
let (_d, store) = temp_store();
let rec = AgentRecord {
agent_id: agent_id("agent-1"),
card: AgentCard {
name: "n".to_string(),
description: "d".to_string(),
version: "1".to_string(),
skills: vec![],
},
kind: super::super::model::AgentKind::Cli,
first_seen: now_micros(),
last_seen: now_micros(),
};
store.put_agent(&rec).expect("put");
assert_eq!(
store.get_agent(&agent_id("agent-1")).expect("get"),
Some(rec)
);
}
}