use crate::memory::context::{ContextStats, ContextWindow, ContextWindowConfig};
use crate::memory::embeddings::{Embedding, Memory, ScoredMemory};
use crate::memory::error::PersistenceError;
use crate::memory::persistence::{self, AgentStateSnapshot, PersistenceConfig};
use crate::messages::Message;
use crate::types::{AgentId, ConversationId, MemoryId, MessageId};
use acton_reactive::prelude::*;
use libsql::{Connection, Database};
#[acton_message]
pub struct InitMemoryStore {
pub config: PersistenceConfig,
}
#[acton_message]
pub struct CreateConversation {
pub agent_id: AgentId,
}
#[acton_message]
pub struct ConversationCreated {
pub conversation_id: ConversationId,
}
#[acton_message]
pub struct SaveMessage {
pub conversation_id: ConversationId,
pub message: Message,
}
#[acton_message]
pub struct MessageSaved {
pub message_id: MessageId,
}
#[acton_message]
pub struct LoadConversation {
pub conversation_id: ConversationId,
}
#[acton_message]
pub struct ConversationLoaded {
pub conversation_id: ConversationId,
pub messages: Vec<Message>,
}
#[acton_message]
pub struct GetLatestConversation {
pub agent_id: AgentId,
}
#[acton_message]
pub struct LatestConversationResponse {
pub conversation_id: Option<ConversationId>,
}
#[acton_message]
pub struct SaveAgentState {
pub snapshot: AgentStateSnapshot,
}
#[acton_message]
pub struct LoadAgentState {
pub agent_id: AgentId,
}
#[acton_message]
pub struct AgentStateLoaded {
pub snapshot: Option<AgentStateSnapshot>,
}
#[acton_message]
pub struct DeleteConversation {
pub conversation_id: ConversationId,
}
#[acton_message]
pub struct ListConversations {
pub agent_id: AgentId,
}
#[acton_message]
pub struct ConversationList {
pub conversations: Vec<ConversationId>,
}
#[acton_message]
pub struct StoreMemory {
pub agent_id: AgentId,
pub content: String,
pub embedding: Option<Embedding>,
}
#[acton_message]
pub struct MemoryStored {
pub memory_id: MemoryId,
}
#[acton_message]
pub struct SearchMemories {
pub agent_id: AgentId,
pub query_embedding: Embedding,
pub limit: usize,
pub min_similarity: Option<f32>,
}
#[acton_message]
pub struct MemorySearchResults {
pub results: Vec<ScoredMemory>,
}
#[acton_message]
pub struct LoadMemories {
pub agent_id: AgentId,
pub limit: Option<usize>,
}
#[acton_message]
pub struct MemoriesLoaded {
pub memories: Vec<Memory>,
}
#[acton_message]
pub struct DeleteMemory {
pub memory_id: MemoryId,
}
#[acton_message]
pub struct DeleteAgentMemories {
pub agent_id: AgentId,
}
#[acton_message]
struct SetConnection {
conn: Connection,
}
#[acton_message]
pub struct GetContextWindow {
pub agent_id: AgentId,
pub system_prompt: String,
pub conversation: Vec<Message>,
pub query_embedding: Option<Embedding>,
pub max_tokens: usize,
pub memory_limit: usize,
}
#[acton_message]
pub struct ContextWindowResponse {
pub messages: Vec<Message>,
pub stats: ContextStats,
pub included_memories: usize,
}
#[derive(Debug, Clone, Default)]
pub struct MemoryStoreMetrics {
pub conversations_created: u64,
pub messages_saved: u64,
pub conversations_loaded: u64,
pub state_saves: u64,
pub state_loads: u64,
pub memories_stored: u64,
pub memory_searches: u64,
pub context_windows_built: u64,
}
#[acton_actor]
pub struct MemoryStore {
pub config: Option<PersistenceConfig>,
pub database: Option<Database>,
pub connection: Option<Connection>,
pub shutting_down: bool,
pub metrics: MemoryStoreMetrics,
}
impl MemoryStore {
pub async fn spawn(runtime: &mut ActorRuntime) -> ActorHandle {
let mut builder = runtime.new_actor_with_name::<MemoryStore>("memory_store".to_string());
builder
.before_start(|_actor| {
tracing::debug!("Memory Store initializing");
Reply::ready()
})
.after_start(|actor| {
tracing::info!(config = ?actor.model.config, "Memory Store ready");
Reply::ready()
})
.before_stop(|actor| {
tracing::info!(
conversations_created = actor.model.metrics.conversations_created,
messages_saved = actor.model.metrics.messages_saved,
"Memory Store shutting down"
);
Reply::ready()
});
configure_handlers(&mut builder);
builder.start().await
}
}
fn configure_handlers(builder: &mut ManagedActor<Idle, MemoryStore>) {
configure_init_handler(builder);
configure_conversation_handlers(builder);
configure_message_handlers(builder);
configure_state_handlers(builder);
configure_memory_handlers(builder);
}
fn configure_init_handler(builder: &mut ManagedActor<Idle, MemoryStore>) {
builder.mutate_on::<SetConnection>(|actor, envelope| {
actor.model.connection = Some(envelope.message().conn.clone());
tracing::info!("Memory Store connection established");
Reply::ready()
});
builder.mutate_on::<InitMemoryStore>(|actor, envelope| {
let config = envelope.message().config.clone();
let actor_handle = actor.handle().clone();
actor.model.config = Some(config.clone());
let handle = tokio::spawn(async move {
match initialize_database(&config).await {
Ok((_db, conn)) => {
actor_handle.send(SetConnection { conn }).await;
tracing::info!(db_path = %config.db_path, "Memory Store initialized with database");
}
Err(e) => {
tracing::error!(error = %e, "Memory Store initialization failed");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
}
async fn initialize_database(
config: &PersistenceConfig,
) -> Result<(Database, Connection), PersistenceError> {
let db = persistence::open_database(config).await?;
let conn = db
.connect()
.map_err(|e| PersistenceError::connection_error(e.to_string()))?;
persistence::initialize_schema(&conn).await?;
Ok((db, conn))
}
fn configure_conversation_handlers(builder: &mut ManagedActor<Idle, MemoryStore>) {
builder.mutate_on::<CreateConversation>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting CreateConversation - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let agent_id = envelope.message().agent_id.clone();
let reply = envelope.reply_envelope();
actor.model.metrics.conversations_created += 1;
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
match persistence::create_conversation(&conn, &agent_id).await {
Ok(conversation_id) => {
reply.send(ConversationCreated { conversation_id }).await;
}
Err(e) => {
tracing::error!(agent_id = %agent_id, error = %e, "Failed to create conversation");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<LoadConversation>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting LoadConversation - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let conversation_id = envelope.message().conversation_id.clone();
let reply = envelope.reply_envelope();
actor.model.metrics.conversations_loaded += 1;
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
match persistence::load_conversation_messages(&conn, &conversation_id).await {
Ok(messages) => {
reply
.send(ConversationLoaded {
conversation_id,
messages,
})
.await;
}
Err(e) => {
tracing::error!(conversation_id = %conversation_id, error = %e, "Failed to load conversation");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<GetLatestConversation>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting GetLatestConversation - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let agent_id = envelope.message().agent_id.clone();
let reply = envelope.reply_envelope();
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
match persistence::get_latest_conversation(&conn, &agent_id).await {
Ok(conversation_id) => {
reply
.send(LatestConversationResponse { conversation_id })
.await;
}
Err(e) => {
tracing::error!(agent_id = %agent_id, error = %e, "Failed to get latest conversation");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<DeleteConversation>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting DeleteConversation - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let conversation_id = envelope.message().conversation_id.clone();
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
if let Err(e) = persistence::delete_conversation(&conn, &conversation_id).await {
tracing::error!(conversation_id = %conversation_id, error = %e, "Failed to delete conversation");
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<ListConversations>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting ListConversations - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let agent_id = envelope.message().agent_id.clone();
let reply = envelope.reply_envelope();
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
match persistence::list_conversations(&conn, &agent_id).await {
Ok(conversations) => {
reply.send(ConversationList { conversations }).await;
}
Err(e) => {
tracing::error!(agent_id = %agent_id, error = %e, "Failed to list conversations");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
}
fn configure_message_handlers(builder: &mut ManagedActor<Idle, MemoryStore>) {
builder.mutate_on::<SaveMessage>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting SaveMessage - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let msg = envelope.message();
let conversation_id = msg.conversation_id.clone();
let message = msg.message.clone();
let reply = envelope.reply_envelope();
actor.model.metrics.messages_saved += 1;
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
match persistence::save_message(&conn, &conversation_id, &message).await {
Ok(message_id) => {
reply.send(MessageSaved { message_id }).await;
}
Err(e) => {
tracing::error!(conversation_id = %conversation_id, error = %e, "Failed to save message");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
}
fn configure_state_handlers(builder: &mut ManagedActor<Idle, MemoryStore>) {
builder.mutate_on::<SaveAgentState>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting SaveAgentState - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let snapshot = envelope.message().snapshot.clone();
actor.model.metrics.state_saves += 1;
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
if let Err(e) = persistence::save_agent_state(&conn, &snapshot).await {
tracing::error!(agent_id = %snapshot.agent_id, error = %e, "Failed to save agent state");
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<LoadAgentState>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting LoadAgentState - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let agent_id = envelope.message().agent_id.clone();
let reply = envelope.reply_envelope();
actor.model.metrics.state_loads += 1;
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
match persistence::load_agent_state(&conn, &agent_id).await {
Ok(snapshot) => {
reply.send(AgentStateLoaded { snapshot }).await;
}
Err(e) => {
tracing::error!(agent_id = %agent_id, error = %e, "Failed to load agent state");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
}
fn configure_memory_handlers(builder: &mut ManagedActor<Idle, MemoryStore>) {
builder.mutate_on::<StoreMemory>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting StoreMemory - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let msg = envelope.message();
let agent_id = msg.agent_id.clone();
let content = msg.content.clone();
let embedding = msg.embedding.clone();
let reply = envelope.reply_envelope();
actor.model.metrics.memories_stored += 1;
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
let memory = match embedding {
Some(emb) => Memory::with_embedding(agent_id.clone(), content, emb),
None => Memory::new(agent_id.clone(), content),
};
match persistence::save_memory(&conn, &memory).await {
Ok(memory_id) => {
reply.send(MemoryStored { memory_id }).await;
}
Err(e) => {
tracing::error!(agent_id = %agent_id, error = %e, "Failed to store memory");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<SearchMemories>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting SearchMemories - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let msg = envelope.message();
let agent_id = msg.agent_id.clone();
let query_embedding = msg.query_embedding.clone();
let limit = msg.limit;
let min_similarity = msg.min_similarity;
let reply = envelope.reply_envelope();
actor.model.metrics.memory_searches += 1;
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
match persistence::search_memories_by_embedding(
&conn,
&agent_id,
&query_embedding,
limit,
min_similarity,
)
.await
{
Ok(results) => {
reply.send(MemorySearchResults { results }).await;
}
Err(e) => {
tracing::error!(agent_id = %agent_id, error = %e, "Failed to search memories");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<LoadMemories>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting LoadMemories - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let msg = envelope.message();
let agent_id = msg.agent_id.clone();
let limit = msg.limit;
let reply = envelope.reply_envelope();
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
match persistence::load_memories_for_agent(&conn, &agent_id, limit).await {
Ok(memories) => {
reply.send(MemoriesLoaded { memories }).await;
}
Err(e) => {
tracing::error!(agent_id = %agent_id, error = %e, "Failed to load memories");
}
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<DeleteMemory>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting DeleteMemory - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let memory_id = envelope.message().memory_id.clone();
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
if let Err(e) = persistence::delete_memory(&conn, &memory_id).await {
tracing::error!(memory_id = %memory_id, error = %e, "Failed to delete memory");
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<DeleteAgentMemories>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting DeleteAgentMemories - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let agent_id = envelope.message().agent_id.clone();
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
if let Err(e) = persistence::delete_memories_for_agent(&conn, &agent_id).await {
tracing::error!(agent_id = %agent_id, error = %e, "Failed to delete agent memories");
}
});
Reply::pending(async move {
let _ = handle.await;
})
});
builder.mutate_on::<GetContextWindow>(|actor, envelope| {
if actor.model.shutting_down {
tracing::warn!("Rejecting GetContextWindow - store is shutting down");
return Reply::ready();
}
let conn = actor.model.connection.clone();
let msg = envelope.message();
let agent_id = msg.agent_id.clone();
let system_prompt = msg.system_prompt.clone();
let conversation = msg.conversation.clone();
let query_embedding = msg.query_embedding.clone();
let max_tokens = msg.max_tokens;
let memory_limit = msg.memory_limit;
let reply = envelope.reply_envelope();
actor.model.metrics.context_windows_built += 1;
let handle = tokio::spawn(async move {
let Some(conn) = conn else {
tracing::error!("Memory Store not initialized");
return;
};
let memories = match query_embedding {
Some(ref emb) => {
match persistence::search_memories_by_embedding(
&conn,
&agent_id,
emb,
memory_limit,
Some(0.0), )
.await
{
Ok(results) => results.into_iter().map(|sm| sm.memory).collect(),
Err(e) => {
tracing::warn!(error = %e, "Failed to retrieve memories for context");
Vec::new()
}
}
}
None => Vec::new(),
};
let included_memories = memories.len();
let config = ContextWindowConfig::with_max_tokens(max_tokens);
let window = ContextWindow::new(config);
let messages = window.build_context(&system_prompt, &memories, &conversation);
let stats = window.get_context_stats(&messages);
reply
.send(ContextWindowResponse {
messages,
stats,
included_memories,
})
.await;
});
Reply::pending(async move {
let _ = handle.await;
})
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn memory_store_metrics_default() {
let metrics = MemoryStoreMetrics::default();
assert_eq!(metrics.conversations_created, 0);
assert_eq!(metrics.messages_saved, 0);
assert_eq!(metrics.conversations_loaded, 0);
assert_eq!(metrics.state_saves, 0);
assert_eq!(metrics.state_loads, 0);
}
}