mod types;
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::Value;
pub use types::*;
use crate::error::{Result, TinyAgentsError};
impl InMemoryStore {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl Store for InMemoryStore {
async fn get(&self, namespace: &str, key: &str) -> Result<Option<Value>> {
let data = self
.data
.lock()
.map_err(|e| TinyAgentsError::Validation(format!("store lock poisoned: {e}")))?;
Ok(data.get(namespace).and_then(|ns| ns.get(key)).cloned())
}
async fn put(&self, namespace: &str, key: &str, value: Value) -> Result<()> {
let mut data = self
.data
.lock()
.map_err(|e| TinyAgentsError::Validation(format!("store lock poisoned: {e}")))?;
data.entry(namespace.to_string())
.or_default()
.insert(key.to_string(), value);
Ok(())
}
async fn delete(&self, namespace: &str, key: &str) -> Result<()> {
let mut data = self
.data
.lock()
.map_err(|e| TinyAgentsError::Validation(format!("store lock poisoned: {e}")))?;
if let Some(ns) = data.get_mut(namespace) {
ns.remove(key);
}
Ok(())
}
async fn list(&self, namespace: &str) -> Result<Vec<String>> {
let data = self
.data
.lock()
.map_err(|e| TinyAgentsError::Validation(format!("store lock poisoned: {e}")))?;
Ok(data
.get(namespace)
.map(|ns| ns.keys().cloned().collect())
.unwrap_or_default())
}
}
impl FileStore {
pub fn new(root_dir: impl Into<std::path::PathBuf>) -> Self {
Self {
root_dir: root_dir.into(),
}
}
fn sanitize(name: &str) -> Result<()> {
if name.is_empty() {
return Err(TinyAgentsError::Validation(
"store namespace and key must not be empty".into(),
));
}
if name.bytes().all(|b| b == b'.') {
return Err(TinyAgentsError::Validation(format!(
"store name must not be all dots: {name:?} (path-traversal guard)"
)));
}
if name
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.')
{
Ok(())
} else {
Err(TinyAgentsError::Validation(format!(
"store name contains invalid characters: {name:?} \
(only ASCII alphanumerics, hyphens, underscores, dots allowed)"
)))
}
}
fn key_path(&self, namespace: &str, key: &str) -> std::path::PathBuf {
self.root_dir.join(namespace).join(format!("{key}.json"))
}
}
#[async_trait]
impl Store for FileStore {
async fn get(&self, namespace: &str, key: &str) -> Result<Option<Value>> {
Self::sanitize(namespace)?;
Self::sanitize(key)?;
let path = self.key_path(namespace, key);
if !path.exists() {
return Ok(None);
}
let bytes = fs::read(&path)
.map_err(|e| TinyAgentsError::Validation(format!("store read error: {e}")))?;
let value: Value = serde_json::from_slice(&bytes)?;
Ok(Some(value))
}
async fn put(&self, namespace: &str, key: &str, value: Value) -> Result<()> {
Self::sanitize(namespace)?;
Self::sanitize(key)?;
let dir = self.root_dir.join(namespace);
fs::create_dir_all(&dir)
.map_err(|e| TinyAgentsError::Validation(format!("store mkdir error: {e}")))?;
let path = dir.join(format!("{key}.json"));
let bytes = serde_json::to_vec_pretty(&value)?;
fs::write(&path, &bytes)
.map_err(|e| TinyAgentsError::Validation(format!("store write error: {e}")))?;
Ok(())
}
async fn delete(&self, namespace: &str, key: &str) -> Result<()> {
Self::sanitize(namespace)?;
Self::sanitize(key)?;
let path = self.key_path(namespace, key);
if path.exists() {
fs::remove_file(&path)
.map_err(|e| TinyAgentsError::Validation(format!("store delete error: {e}")))?;
}
Ok(())
}
async fn list(&self, namespace: &str) -> Result<Vec<String>> {
Self::sanitize(namespace)?;
let dir = self.root_dir.join(namespace);
if !dir.exists() {
return Ok(Vec::new());
}
let entries = fs::read_dir(&dir)
.map_err(|e| TinyAgentsError::Validation(format!("store readdir error: {e}")))?;
let mut keys = Vec::new();
for entry in entries {
let entry = entry
.map_err(|e| TinyAgentsError::Validation(format!("store entry error: {e}")))?;
let file_name = entry.file_name();
let name = file_name.to_string_lossy();
if let Some(stem) = name.strip_suffix(".json") {
keys.push(stem.to_string());
}
}
Ok(keys)
}
}
impl InMemoryAppendStore {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl AppendStore for InMemoryAppendStore {
async fn append(&self, stream: &str, value: Value) -> Result<u64> {
let mut streams = self
.streams
.lock()
.map_err(|e| TinyAgentsError::Validation(format!("append store lock poisoned: {e}")))?;
let entries = streams.entry(stream.to_string()).or_default();
let offset = entries.len() as u64;
entries.push((now_ms(), value));
Ok(offset)
}
async fn read_from(&self, stream: &str, offset: u64) -> Result<Vec<(u64, Value)>> {
let streams = self
.streams
.lock()
.map_err(|e| TinyAgentsError::Validation(format!("append store lock poisoned: {e}")))?;
let Some(entries) = streams.get(stream) else {
return Ok(Vec::new());
};
Ok(entries
.iter()
.enumerate()
.skip(offset as usize)
.map(|(i, (_ts, value))| (i as u64, value.clone()))
.collect())
}
async fn len(&self, stream: &str) -> Result<u64> {
let streams = self
.streams
.lock()
.map_err(|e| TinyAgentsError::Validation(format!("append store lock poisoned: {e}")))?;
Ok(streams.get(stream).map(|e| e.len() as u64).unwrap_or(0))
}
}
impl JsonlAppendStore {
pub fn new(root_dir: impl Into<std::path::PathBuf>) -> Self {
Self {
root_dir: root_dir.into(),
}
}
fn stream_path(&self, stream: &str) -> Result<std::path::PathBuf> {
FileStore::sanitize(stream)?;
Ok(self.root_dir.join(format!("{stream}.jsonl")))
}
fn read_records(path: &std::path::Path) -> Result<Vec<StoreRecord>> {
if !path.exists() {
return Ok(Vec::new());
}
let text = fs::read_to_string(path)
.map_err(|e| TinyAgentsError::Validation(format!("append store read error: {e}")))?;
let mut records = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let record: StoreRecord = serde_json::from_str(line)?;
records.push(record);
}
Ok(records)
}
}
#[async_trait]
impl AppendStore for JsonlAppendStore {
async fn append(&self, stream: &str, value: Value) -> Result<u64> {
let path = self.stream_path(stream)?;
fs::create_dir_all(&self.root_dir)
.map_err(|e| TinyAgentsError::Validation(format!("append store mkdir error: {e}")))?;
let offset = Self::read_records(&path)?.len() as u64;
let record = StoreRecord {
offset,
value,
created_at_ms: now_ms(),
};
let mut line = serde_json::to_string(&record)?;
line.push('\n');
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.map_err(|e| TinyAgentsError::Validation(format!("append store open error: {e}")))?;
std::io::Write::write_all(&mut file, line.as_bytes())
.map_err(|e| TinyAgentsError::Validation(format!("append store write error: {e}")))?;
Ok(offset)
}
async fn read_from(&self, stream: &str, offset: u64) -> Result<Vec<(u64, Value)>> {
let path = self.stream_path(stream)?;
Ok(Self::read_records(&path)?
.into_iter()
.skip(offset as usize)
.map(|r| (r.offset, r.value))
.collect())
}
async fn len(&self, stream: &str) -> Result<u64> {
let path = self.stream_path(stream)?;
Ok(Self::read_records(&path)?.len() as u64)
}
}
fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
impl StoreRegistry {
pub fn new() -> Self {
Self {
stores: HashMap::new(),
default_store: Arc::new(InMemoryStore::new()),
}
}
pub fn register(&mut self, name: impl Into<String>, store: Arc<dyn Store>) -> &mut Self {
self.stores.insert(name.into(), store);
self
}
pub fn get(&self, name: &str) -> Option<Arc<dyn Store>> {
self.stores.get(name).cloned()
}
pub fn default_store(&self) -> Arc<dyn Store> {
Arc::clone(&self.default_store)
}
}
impl Default for StoreRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod test;