use std::collections::{BTreeMap, HashMap};
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use crate::error::MemMapError;
use crate::storage::WalCommand;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryMetadata {
pub id: u64,
pub chunk_id: u32,
pub offset: u64,
pub length: u64,
pub tags: Vec<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AgentState {
pub status: String,
pub active_memory_count: u64,
pub last_updated_id: u64,
pub extra: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamSegment {
pub chunk_id: u32,
pub offset: u64,
pub length: u64,
}
pub struct MemMapEngine {
pub metadata_index: BTreeMap<u64, MemoryMetadata>,
pub kv_store: HashMap<String, Vec<u8>>,
pub streams: HashMap<String, Vec<StreamSegment>>,
state_tx: watch::Sender<AgentState>,
next_id: u64,
}
impl MemMapEngine {
pub fn new() -> (Self, watch::Receiver<AgentState>) {
let (state_tx, state_rx) = watch::channel(AgentState::default());
let engine = Self {
metadata_index: BTreeMap::new(),
kv_store: HashMap::new(),
streams: HashMap::new(),
state_tx,
next_id: 1,
};
(engine, state_rx)
}
pub fn replay_commands(&mut self, commands: &[WalCommand]) {
for cmd in commands {
match cmd {
WalCommand::AppendMemory {
id,
chunk_id,
offset,
length,
tags,
} => {
self.metadata_index.insert(
*id,
MemoryMetadata {
id: *id,
chunk_id: *chunk_id,
offset: *offset,
length: *length,
tags: tags.clone(),
},
);
if *id >= self.next_id {
self.next_id = id + 1;
}
}
WalCommand::SetKv { key, value } => {
self.kv_store.insert(key.clone(), value.clone());
}
WalCommand::DeleteKv { key } => {
self.kv_store.remove(key);
}
WalCommand::AppendStream {
key,
chunk_id,
offset,
length,
} => {
self.streams
.entry(key.clone())
.or_default()
.push(StreamSegment {
chunk_id: *chunk_id,
offset: *offset,
length: *length,
});
}
}
}
}
pub fn next_memory_id(&mut self) -> u64 {
let id = self.next_id;
self.next_id += 1;
id
}
pub fn insert_metadata(&mut self, meta: MemoryMetadata) {
self.metadata_index.insert(meta.id, meta);
}
pub fn get_metadata(&self, id: u64) -> Option<&MemoryMetadata> {
self.metadata_index.get(&id)
}
pub fn all_ids(&self) -> impl Iterator<Item = u64> + '_ {
self.metadata_index.keys().copied()
}
pub fn broadcast_state(&self, state: AgentState) -> Result<(), MemMapError> {
self.state_tx.send_replace(state);
Ok(())
}
pub fn subscribe_state(&self) -> watch::Receiver<AgentState> {
self.state_tx.subscribe()
}
pub fn current_state(&self) -> AgentState {
self.state_tx.borrow().clone()
}
}