pub mod error;
pub mod storage;
pub mod engine;
pub mod search;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tokio::sync::watch;
use crate::engine::{AgentState, MemMapEngine};
use crate::error::MemMapError;
use crate::search::SearchProvider;
use crate::storage::{BlockStorage, LogManager, WalCommand};
#[derive(Clone)]
pub struct MemMapFS {
root_dir: PathBuf,
engine: Arc<RwLock<MemMapEngine>>,
log_manager: Arc<RwLock<LogManager>>,
block_storage: Arc<RwLock<BlockStorage>>,
search_provider: Arc<SearchProvider>,
state_rx: watch::Receiver<AgentState>,
}
impl MemMapFS {
pub async fn init<P: Into<PathBuf>>(root: P) -> Result<Self, MemMapError> {
let root_dir: PathBuf = root.into();
tokio::fs::create_dir_all(&root_dir).await?;
tokio::fs::create_dir_all(root_dir.join("index")).await?;
tokio::fs::create_dir_all(root_dir.join("blocks")).await?;
let log_manager = LogManager::open(&root_dir)?;
let block_storage = BlockStorage::open(&root_dir)?;
let search_provider = SearchProvider::open(&root_dir)?;
let commands = log_manager.replay()?;
let (mut engine_inner, state_rx) = MemMapEngine::new();
engine_inner.replay_commands(&commands);
Ok(Self {
root_dir,
engine: Arc::new(RwLock::new(engine_inner)),
log_manager: Arc::new(RwLock::new(log_manager)),
block_storage: Arc::new(RwLock::new(block_storage)),
search_provider: Arc::new(search_provider),
state_rx,
})
}
pub async fn append_memory(&self, text: &str, tags: Vec<String>) -> Result<u64, MemMapError> {
let id = {
let mut eng = self.engine.write().unwrap();
eng.next_memory_id()
};
let data = text.as_bytes();
let (chunk_id, offset, length) = {
let mut store = self.block_storage.write().unwrap();
store.append(data)?
};
let cmd = WalCommand::AppendMemory {
id,
chunk_id,
offset,
length,
tags: tags.clone(),
};
{
let mut wal = self.log_manager.write().unwrap();
wal.append(&cmd)?;
}
{
let mut eng = self.engine.write().unwrap();
eng.insert_metadata(engine::MemoryMetadata {
id,
chunk_id,
offset,
length,
tags: tags.clone(),
});
}
self.search_provider.index_memory(id, text, &tags).await?;
Ok(id)
}
pub async fn query_memory(&self, query: &str) -> Result<Vec<String>, MemMapError> {
const MAX_RESULTS: usize = 20;
let ids = self.search_provider.search(query, MAX_RESULTS)?;
let mut results = Vec::with_capacity(ids.len());
for id in ids {
let meta = {
let eng = self.engine.read().unwrap();
eng.get_metadata(id).cloned()
};
if let Some(meta) = meta {
let bytes = {
let store = self.block_storage.read().unwrap();
store.read(meta.chunk_id, meta.offset, meta.length)?
};
results.push(String::from_utf8_lossy(&bytes).into_owned());
}
}
Ok(results)
}
pub async fn update_state(&self, state: AgentState) -> Result<(), MemMapError> {
let eng = self.engine.write().unwrap();
eng.broadcast_state(state)
}
pub fn subscribe_state(&self) -> watch::Receiver<AgentState> {
self.state_rx.clone()
}
pub fn get_kv(&self, key: &str) -> Option<Vec<u8>> {
let eng = self.engine.read().unwrap();
eng.kv_store.get(key).cloned()
}
pub async fn set_kv(&self, key: String, value: Vec<u8>) -> Result<(), MemMapError> {
let cmd = WalCommand::SetKv {
key: key.clone(),
value: value.clone(),
};
{
let mut wal = self.log_manager.write().unwrap();
wal.append(&cmd)?;
}
{
let mut eng = self.engine.write().unwrap();
eng.kv_store.insert(key, value);
}
Ok(())
}
pub async fn delete_kv(&self, key: String) -> Result<(), MemMapError> {
let cmd = WalCommand::DeleteKv { key: key.clone() };
{
let mut wal = self.log_manager.write().unwrap();
wal.append(&cmd)?;
}
{
let mut eng = self.engine.write().unwrap();
eng.kv_store.remove(&key);
}
Ok(())
}
pub fn root_dir(&self) -> &PathBuf {
&self.root_dir
}
pub async fn put_session(&self, session: &Session) -> Result<(), MemMapError> {
let key = format!("sessions/{}", session.id);
let bytes = bincode::serialize(session)?;
self.set_kv(key, bytes).await
}
pub fn get_session(&self, id: &str) -> Result<Option<Session>, MemMapError> {
let key = format!("sessions/{}", id);
if let Some(bytes) = self.get_kv(&key) {
let session = bincode::deserialize(&bytes)?;
Ok(Some(session))
} else {
Ok(None)
}
}
pub fn list_sessions(&self, limit: usize) -> Result<Vec<Session>, MemMapError> {
let eng = self.engine.read().unwrap();
let mut sessions = Vec::new();
for (key, val) in &eng.kv_store {
if key.starts_with("sessions/") {
if let Ok(session) = bincode::deserialize::<Session>(val) {
sessions.push(session);
}
}
}
sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
sessions.truncate(limit);
Ok(sessions)
}
pub fn list_by_intent(&self, intent_id: &str) -> Result<Vec<Session>, MemMapError> {
let eng = self.engine.read().unwrap();
let mut sessions = Vec::new();
for (key, val) in &eng.kv_store {
if key.starts_with("sessions/") {
if let Ok(session) = bincode::deserialize::<Session>(val) {
if session.intent_id == intent_id {
sessions.push(session);
}
}
}
}
sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(sessions)
}
pub async fn append_stream(&self, key: &str, data: &[u8]) -> Result<(), MemMapError> {
let (chunk_id, offset, length) = {
let mut store = self.block_storage.write().unwrap();
store.append(data)?
};
let cmd = WalCommand::AppendStream {
key: key.to_string(),
chunk_id,
offset,
length,
};
{
let mut wal = self.log_manager.write().unwrap();
wal.append(&cmd)?;
}
{
let mut eng = self.engine.write().unwrap();
eng.streams
.entry(key.to_string())
.or_default()
.push(crate::engine::StreamSegment {
chunk_id,
offset,
length,
});
}
Ok(())
}
pub fn open_read(&self, key: &str) -> Result<StreamReader, MemMapError> {
let segments = {
let eng = self.engine.read().unwrap();
eng.streams.get(key).cloned().unwrap_or_default()
};
Ok(StreamReader {
block_storage: self.block_storage.clone(),
segments,
current_segment_idx: 0,
current_segment_offset: 0,
})
}
pub async fn index(&self, key: &str, text: &str) -> Result<(), MemMapError> {
self.search_provider.index(key, text).await
}
pub fn search(&self, query: &str, limit: usize) -> Result<Vec<crate::search::Hit>, MemMapError> {
self.search_provider.search_hits(query, limit)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct Session {
pub id: String,
pub intent_id: String,
pub created_at: u64, pub payload: Vec<u8>,
}
pub struct StreamReader {
block_storage: Arc<RwLock<BlockStorage>>,
segments: Vec<crate::engine::StreamSegment>,
current_segment_idx: usize,
current_segment_offset: u64,
}
impl std::io::Read for StreamReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.current_segment_idx >= self.segments.len() {
return Ok(0); }
let seg = &self.segments[self.current_segment_idx];
let remaining = seg.length - self.current_segment_offset;
if remaining == 0 {
self.current_segment_idx += 1;
self.current_segment_offset = 0;
return self.read(buf);
}
let read_len = (buf.len() as u64).min(remaining);
let bytes = {
let store = self.block_storage.read().unwrap();
store
.read(
seg.chunk_id,
seg.offset + self.current_segment_offset,
read_len,
)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?
};
buf[..bytes.len()].copy_from_slice(&bytes);
self.current_segment_offset += bytes.len() as u64;
Ok(bytes.len())
}
}