pensyve-core 1.3.1

Universal memory runtime for AI agents — episodic, semantic, and procedural memory with 8-signal fusion retrieval
Documentation
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::types::{
    Edge, Entity, Episode, EpisodicMemory, Memory, Namespace, ObservationMemory, ProceduralMemory,
    SemanticMemory,
};

pub mod sqlite;

#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "postgres")]
pub use postgres::PostgresBackend;

// ---------------------------------------------------------------------------
// Error type
// ---------------------------------------------------------------------------

#[derive(Debug, thiserror::Error)]
pub enum StorageError {
    #[error("SQLite error: {0}")]
    Sqlite(#[from] rusqlite::Error),
    #[error("Serialization error: {0}")]
    Serde(#[from] serde_json::Error),
    #[error("Not found: {0}")]
    NotFound(String),
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),
    #[error("Storage context: {0}")]
    Context(String),
    #[error("Mutex lock poisoned: {0}")]
    LockPoisoned(String),
}

pub type StorageResult<T> = Result<T, StorageError>;

// ---------------------------------------------------------------------------
// StorageTrait
// ---------------------------------------------------------------------------

pub trait StorageTrait: Send + Sync {
    // Namespaces
    fn save_namespace(&self, ns: &Namespace) -> StorageResult<()>;
    fn get_namespace(&self, id: Uuid) -> StorageResult<Option<Namespace>>;
    fn get_namespace_by_name(&self, name: &str) -> StorageResult<Option<Namespace>>;

    // Entities
    fn save_entity(&self, entity: &Entity) -> StorageResult<()>;
    fn get_entity(&self, id: Uuid) -> StorageResult<Option<Entity>>;
    fn get_entity_by_name(&self, name: &str, namespace_id: Uuid) -> StorageResult<Option<Entity>>;

    // Episodes
    fn save_episode(&self, episode: &Episode) -> StorageResult<()>;
    fn get_episode(&self, id: Uuid) -> StorageResult<Option<Episode>>;
    fn update_episode(&self, episode: &Episode) -> StorageResult<()>;

    // Episodic Memory
    fn save_episodic(&self, mem: &EpisodicMemory) -> StorageResult<()>;
    fn get_episodic(&self, id: Uuid) -> StorageResult<Option<EpisodicMemory>>;
    fn list_episodic_by_entity(
        &self,
        about_entity: Uuid,
        limit: usize,
    ) -> StorageResult<Vec<EpisodicMemory>>;

    /// Fetch every episodic memory tied to the given episode, ordered by
    /// `event_time` (falling back to `timestamp`). Used by the observation
    /// extraction ingest hook — the extractor sees the full conversation,
    /// not just the turns the caller happens to still have in memory.
    ///
    /// Default implementation walks `get_all_memories_by_namespace` for
    /// backends that don't provide a direct index; override for performance.
    fn list_episodic_by_episode(
        &self,
        namespace_id: Uuid,
        episode_id: Uuid,
    ) -> StorageResult<Vec<EpisodicMemory>> {
        let all = self.get_all_memories_by_namespace(namespace_id)?;
        let mut out: Vec<EpisodicMemory> = all
            .into_iter()
            .filter_map(|m| match m {
                Memory::Episodic(e) if e.episode_id == episode_id => Some(e),
                _ => None,
            })
            .collect();
        out.sort_by_key(|e| e.event_time.unwrap_or(e.timestamp));
        Ok(out)
    }

    fn update_episodic_access(
        &self,
        id: Uuid,
        stability: f32,
        retrievability: f32,
    ) -> StorageResult<()>;

    // Semantic Memory
    fn save_semantic(&self, mem: &SemanticMemory) -> StorageResult<()>;
    fn get_semantic(&self, id: Uuid) -> StorageResult<Option<SemanticMemory>>;
    fn list_semantic_by_entity(
        &self,
        subject: Uuid,
        limit: usize,
    ) -> StorageResult<Vec<SemanticMemory>>;
    fn invalidate_semantic(&self, id: Uuid) -> StorageResult<()>;

    // Procedural Memory
    fn save_procedural(&self, mem: &ProceduralMemory) -> StorageResult<()>;
    fn get_procedural(&self, id: Uuid) -> StorageResult<Option<ProceduralMemory>>;
    fn update_procedural_reliability(
        &self,
        id: Uuid,
        reliability: f32,
        trial_count: u32,
        success_count: u32,
    ) -> StorageResult<()>;

    // Observation Memory (derived per-episode artifacts)
    //
    // Observations are extracted from episodic messages at ingest time and
    // surfaced at recall time by joining on the top-k episodes' IDs. They do
    // not participate in RRF candidate selection. Default implementations
    // are no-ops so existing backends keep working without observation support.
    fn save_observation(&self, _mem: &ObservationMemory) -> StorageResult<()> {
        Err(StorageError::Context(
            "save_observation not implemented on this backend".into(),
        ))
    }

    fn get_observation(&self, _id: Uuid) -> StorageResult<Option<ObservationMemory>> {
        Ok(None)
    }

    /// Fetch all observations attached to any of the given episode IDs,
    /// bounded by `limit` (applied after fetch). Used by `recall_grouped` to
    /// attach observations to top-k session groups.
    fn list_observations_by_episode_ids(
        &self,
        _episode_ids: &[Uuid],
        _limit: usize,
    ) -> StorageResult<Vec<ObservationMemory>> {
        Ok(Vec::new())
    }

    /// Delete every observation tied to the given episode. Returns the row count.
    /// Called as part of episode cascade-delete paths.
    fn delete_observations_by_episode(&self, _episode_id: Uuid) -> StorageResult<usize> {
        Ok(0)
    }

    /// Delete every observation whose source episode is associated with the
    /// given entity (either as `source_entity` or `about_entity` on an
    /// episodic memory). Used by GDPR cascade-delete paths — called BEFORE
    /// `delete_memories_by_entity` so the episodic→entity join still exists.
    ///
    /// Returns the row count of deleted observations.
    fn delete_observations_by_entity(&self, _entity_id: Uuid) -> StorageResult<usize> {
        Ok(0)
    }

    // Full-text search (BM25)
    fn search_fts(
        &self,
        query: &str,
        namespace_id: Uuid,
        limit: usize,
    ) -> StorageResult<Vec<Memory>>;

    /// Entity-scoped full-text search.
    ///
    /// Like `search_fts`, but only returns semantic memories whose `subject`
    /// matches `entity_id` and episodic memories whose `about_entity` or
    /// `source_entity` matches `entity_id`. Procedural memories are excluded
    /// (they are project-agnostic).
    fn search_fts_scoped(
        &self,
        query: &str,
        namespace_id: Uuid,
        entity_id: Uuid,
        limit: usize,
    ) -> StorageResult<Vec<Memory>>;

    // Bulk
    fn get_all_memories_by_namespace(&self, namespace_id: Uuid) -> StorageResult<Vec<Memory>>;

    // Deletion
    fn delete_memories_by_entity(&self, entity_id: Uuid) -> StorageResult<usize>;

    /// Delete a single memory by its UUID (episodic, semantic, or procedural).
    fn delete_memory_by_id(&self, id: Uuid) -> StorageResult<bool>;

    /// Delete all memories in a namespace. Returns the count of deleted memories.
    fn purge_namespace(&self, namespace_id: Uuid) -> StorageResult<usize> {
        // Default: fall back to loading + deleting one by one.
        let memories = self.get_all_memories_by_namespace(namespace_id)?;
        let mut count = 0;
        for mem in &memories {
            if self.delete_memory_by_id(mem.id()).unwrap_or(false) {
                count += 1;
            }
        }
        Ok(count)
    }

    /// Update a semantic memory's content and/or confidence.
    fn update_semantic_content(
        &self,
        id: Uuid,
        predicate: &str,
        object: &str,
        confidence: Option<f32>,
    ) -> StorageResult<()>;

    /// Delete an entity record by its UUID. Returns true if the entity was found and deleted.
    fn delete_entity(&self, id: Uuid) -> StorageResult<bool>;

    // Entities (bulk)
    fn list_entities_by_namespace(&self, namespace_id: Uuid) -> StorageResult<Vec<Entity>>;

    // Edges
    fn save_edge(&self, edge: &Edge) -> StorageResult<()>;
    fn get_edges_for_entity(&self, entity_id: Uuid) -> StorageResult<Vec<Edge>>;

    // Counts (lightweight, no embedding pipeline)
    /// Count memories by type for a namespace without loading memory content.
    fn count_memories_by_namespace(
        &self,
        namespace_id: Uuid,
    ) -> StorageResult<(usize, usize, usize)>; // (episodic, semantic, procedural)

    /// Count entities in a namespace.
    fn count_entities_by_namespace(&self, namespace_id: Uuid) -> StorageResult<usize>;

    // Activity logging
    /// Record an activity event (recall, remember, observe, forget, etc.).
    fn log_activity(
        &self,
        namespace_id: Uuid,
        event_type: &str,
        detail: &serde_json::Value,
    ) -> StorageResult<()>;

    /// Aggregate activity counts by day for the last N days.
    fn get_activity_aggregates(
        &self,
        namespace_id: Uuid,
        days: u32,
    ) -> StorageResult<Vec<ActivityAggregate>>;

    /// Retrieve the most recent activity events.
    fn get_recent_activity(
        &self,
        namespace_id: Uuid,
        limit: usize,
    ) -> StorageResult<Vec<ActivityEvent>>;
}

// ---------------------------------------------------------------------------
// Activity event types
// ---------------------------------------------------------------------------

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActivityEvent {
    pub id: Uuid,
    pub event_type: String,
    pub namespace_id: Uuid,
    pub detail_json: serde_json::Value,
    pub created_at: DateTime<Utc>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActivityAggregate {
    pub date: String,
    pub recalls: usize,
    pub remembers: usize,
    pub observes: usize,
    pub forgets: usize,
}