Skip to main content

GraphStore

Struct GraphStore 

Source
pub struct GraphStore { /* private fields */ }

Implementations§

Source§

impl GraphStore

Source

pub fn new(pool: SqlitePool) -> Self

Source

pub fn pool(&self) -> &SqlitePool

Source

pub async fn upsert_entity( &self, surface_name: &str, canonical_name: &str, entity_type: EntityType, summary: Option<&str>, ) -> Result<i64, MemoryError>

Insert or update an entity by (canonical_name, entity_type).

  • surface_name: the original display form (e.g. "Rust") — stored in the name column so user-facing output preserves casing. Updated on every upsert to the latest seen form.
  • canonical_name: the stable normalized key (e.g. "rust") — used for deduplication.
  • summary: pass None to preserve the existing summary; pass Some("") to blank it.
§Errors

Returns an error if the database query fails.

Source

pub async fn find_entity( &self, canonical_name: &str, entity_type: EntityType, ) -> Result<Option<Entity>, MemoryError>

Find an entity by exact canonical name and type.

§Errors

Returns an error if the database query fails.

Source

pub async fn find_entity_by_id( &self, entity_id: i64, ) -> Result<Option<Entity>, MemoryError>

Find an entity by its numeric ID.

§Errors

Returns an error if the database query fails.

Source

pub async fn set_entity_qdrant_point_id( &self, entity_id: i64, point_id: &str, ) -> Result<(), MemoryError>

Update the qdrant_point_id for an entity.

§Errors

Returns an error if the database query fails.

Source

pub async fn find_entities_fuzzy( &self, query: &str, limit: usize, ) -> Result<Vec<Entity>, MemoryError>

Find entities matching query in name, summary, or aliases, up to limit results, ranked by relevance.

Uses FTS5 MATCH with prefix wildcards (token*) and bm25 ranking. Name matches are weighted 10x higher than summary matches. Also searches graph_entity_aliases for alias matches via a UNION query.

§Behavioral note

This replaces the previous LIKE '%query%' implementation. FTS5 prefix matching differs from substring matching: searching “SQL” will match “SQLite” (prefix) but NOT “GraphQL” (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so mid-word substrings are not matched. This is a known trade-off for index performance.

Single-character queries (e.g., “a”) are allowed and produce a broad prefix match (“a*”). The limit parameter caps the result set. No minimum query length is enforced; if this causes noise in practice, add a minimum length guard at the call site.

§Errors

Returns an error if the database query fails.

Source

pub fn all_entities_stream( &self, ) -> impl Stream<Item = Result<Entity, MemoryError>> + '_

Stream all entities from the database incrementally (true cursor, no full-table load).

Source

pub async fn add_alias( &self, entity_id: i64, alias_name: &str, ) -> Result<(), MemoryError>

Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).

§Errors

Returns an error if the database query fails.

Source

pub async fn find_entity_by_alias( &self, alias_name: &str, entity_type: EntityType, ) -> Result<Option<Entity>, MemoryError>

Find an entity by alias name and entity type (case-insensitive).

Filters by entity_type to avoid cross-type alias collisions (S2 fix).

§Errors

Returns an error if the database query fails.

Source

pub async fn aliases_for_entity( &self, entity_id: i64, ) -> Result<Vec<EntityAlias>, MemoryError>

Get all aliases for an entity.

§Errors

Returns an error if the database query fails.

Source

pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError>

Collect all entities into a Vec.

§Errors

Returns an error if the database query fails or entity_type parsing fails.

Source

pub async fn entity_count(&self) -> Result<i64, MemoryError>

Count the total number of entities.

§Errors

Returns an error if the database query fails.

Source

pub async fn insert_edge( &self, source_entity_id: i64, target_entity_id: i64, relation: &str, fact: &str, confidence: f32, episode_id: Option<MessageId>, ) -> Result<i64, MemoryError>

Insert a new edge between two entities, or update the existing active edge.

An active edge is identified by (source_entity_id, target_entity_id, relation, edge_type) with valid_to IS NULL. If such an edge already exists, its confidence is updated to the maximum of the stored and incoming values, and the existing id is returned. This prevents duplicate edges from repeated extraction of the same context messages.

The dedup key includes edge_type (critic mitigation): the same (source, target, relation) triple can legitimately exist with different edge types (e.g., depends_on can be both Semantic and Causal). Without edge_type in the key, the second insertion would silently update the first and lose the type classification.

§Errors

Returns an error if the database query fails.

Source

pub async fn insert_edge_typed( &self, source_entity_id: i64, target_entity_id: i64, relation: &str, fact: &str, confidence: f32, episode_id: Option<MessageId>, edge_type: EdgeType, ) -> Result<i64, MemoryError>

Insert a typed edge between two entities, or update the existing active edge of the same type.

Identical semantics to [insert_edge] but with an explicit edge_type parameter. The dedup key is (source_entity_id, target_entity_id, relation, edge_type, valid_to IS NULL).

§Errors

Returns an error if the database query fails.

Source

pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError>

Mark an edge as invalid (set valid_to and expired_at to now).

§Errors

Returns an error if the database update fails.

Source

pub async fn edges_for_entities( &self, entity_ids: &[i64], edge_types: &[EdgeType], ) -> Result<Vec<Edge>, MemoryError>

Get all active edges for a batch of entity IDs, with optional MAGMA edge type filtering.

Fetches all currently-active edges (valid_to IS NULL) where either endpoint is in entity_ids. Traversal is always current-time only (no at_timestamp support in v1 — see bfs_at_timestamp for historical traversal).

§SQLite bind limit safety

SQLite limits the number of bind parameters to SQLITE_MAX_VARIABLE_NUMBER (999 by default). Each entity ID requires two bind slots (source OR target), so batches are chunked at MAX_BATCH_ENTITIES = 490 to stay safely under the limit regardless of compile-time SQLite configuration.

§Errors

Returns an error if the database query fails.

Source

pub async fn edges_for_entity( &self, entity_id: i64, ) -> Result<Vec<Edge>, MemoryError>

Get all active edges where entity is source or target.

§Errors

Returns an error if the database query fails.

Source

pub async fn edge_history_for_entity( &self, entity_id: i64, limit: usize, ) -> Result<Vec<Edge>, MemoryError>

Get all edges (active and expired) where entity is source or target, ordered by valid_from DESC. Used by the /graph history <name> slash command.

§Errors

Returns an error if the database query fails or if limit overflows i64.

Source

pub async fn edges_between( &self, entity_a: i64, entity_b: i64, ) -> Result<Vec<Edge>, MemoryError>

Get all active edges between two entities (both directions).

§Errors

Returns an error if the database query fails.

Source

pub async fn edges_exact( &self, source_entity_id: i64, target_entity_id: i64, ) -> Result<Vec<Edge>, MemoryError>

Get active edges from source to target in the exact direction (no reverse).

§Errors

Returns an error if the database query fails.

Source

pub async fn active_edge_count(&self) -> Result<i64, MemoryError>

Count active (non-invalidated) edges.

§Errors

Returns an error if the database query fails.

Source

pub async fn edge_type_distribution( &self, ) -> Result<Vec<(String, i64)>, MemoryError>

Return per-type active edge counts as (edge_type, count) pairs.

§Errors

Returns an error if the database query fails.

Source

pub async fn upsert_community( &self, name: &str, summary: &str, entity_ids: &[i64], fingerprint: Option<&str>, ) -> Result<i64, MemoryError>

Insert or update a community by name.

fingerprint is a BLAKE3 hex string computed from sorted entity IDs and intra-community edge IDs. Pass None to leave the fingerprint unchanged (e.g. when assign_to_community adds an entity without a full re-detection pass).

§Errors

Returns an error if the database query fails or JSON serialization fails.

Source

pub async fn community_fingerprints( &self, ) -> Result<HashMap<String, i64>, MemoryError>

Return a map of fingerprint -> community_id for all communities with a non-NULL fingerprint. Used by detect_communities to skip unchanged partitions.

§Errors

Returns an error if the database query fails.

Source

pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError>

Delete a single community by its primary key.

§Errors

Returns an error if the database query fails.

Source

pub async fn clear_community_fingerprint( &self, id: i64, ) -> Result<(), MemoryError>

Set the fingerprint of a community to NULL, invalidating the incremental cache.

Used by assign_to_community when an entity is added without a full re-detection pass, ensuring the next detect_communities run re-summarizes the affected community.

§Errors

Returns an error if the database query fails.

Source

pub async fn community_for_entity( &self, entity_id: i64, ) -> Result<Option<Community>, MemoryError>

Find the first community that contains the given entity_id.

Uses json_each() to push the membership search into SQLite, avoiding a full table scan with per-row JSON parsing.

§Errors

Returns an error if the database query fails or JSON parsing fails.

Source

pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError>

Get all communities.

§Errors

Returns an error if the database query fails or JSON parsing fails.

Source

pub async fn community_count(&self) -> Result<i64, MemoryError>

Count the total number of communities.

§Errors

Returns an error if the database query fails.

Source

pub async fn get_metadata( &self, key: &str, ) -> Result<Option<String>, MemoryError>

Get a metadata value by key.

§Errors

Returns an error if the database query fails.

Source

pub async fn set_metadata( &self, key: &str, value: &str, ) -> Result<(), MemoryError>

Set a metadata value by key (upsert).

§Errors

Returns an error if the database query fails.

Source

pub async fn extraction_count(&self) -> Result<i64, MemoryError>

Get the current extraction count from metadata.

Returns 0 if the counter has not been initialized.

§Errors

Returns an error if the database query fails.

Source

pub fn all_active_edges_stream( &self, ) -> impl Stream<Item = Result<Edge, MemoryError>> + '_

Stream all active (non-invalidated) edges.

Source

pub async fn edges_after_id( &self, after_id: i64, limit: i64, ) -> Result<Vec<Edge>, MemoryError>

Fetch a chunk of active edges using keyset pagination.

Returns edges with id > after_id in ascending order, up to limit rows. Starting with after_id = 0 returns the first chunk. Pass the last id from the returned chunk as after_id for the next page. An empty result means all edges have been consumed.

Keyset pagination is O(1) per page (index seek on id) vs OFFSET which is O(N). It is also stable under concurrent inserts: new edges get monotonically higher IDs and will appear in subsequent chunks or after the last chunk, never causing duplicates. Concurrent invalidations (setting valid_to) may cause a single edge to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.

§Errors

Returns an error if the database query fails.

Source

pub async fn find_community_by_id( &self, id: i64, ) -> Result<Option<Community>, MemoryError>

Find a community by its primary key.

§Errors

Returns an error if the database query fails or JSON parsing fails.

Source

pub async fn delete_all_communities(&self) -> Result<(), MemoryError>

Delete all communities (full rebuild before upsert).

§Errors

Returns an error if the database query fails.

Source

pub async fn delete_expired_edges( &self, retention_days: u32, ) -> Result<usize, MemoryError>

Delete expired edges older than retention_days and return count deleted.

§Errors

Returns an error if the database query fails.

Source

pub async fn delete_orphan_entities( &self, retention_days: u32, ) -> Result<usize, MemoryError>

Delete orphan entities (no active edges, last seen more than retention_days ago).

§Errors

Returns an error if the database query fails.

Source

pub async fn cap_entities( &self, max_entities: usize, ) -> Result<usize, MemoryError>

Delete the oldest excess entities when count exceeds max_entities.

Entities are ranked by ascending edge count, then ascending last_seen_at (LRU). Only deletes when entity_count() > max_entities.

§Errors

Returns an error if the database query fails.

Source

pub async fn edges_at_timestamp( &self, entity_id: i64, timestamp: &str, ) -> Result<Vec<Edge>, MemoryError>

Return all edges for entity_id (as source or target) that were valid at timestamp.

An edge is valid at timestamp when:

  • valid_from <= timestamp, AND
  • valid_to IS NULL (open-ended) OR valid_to > timestamp.

timestamp must be a SQLite datetime string: "YYYY-MM-DD HH:MM:SS".

§Errors

Returns an error if the database query fails.

Source

pub async fn edge_history( &self, source_entity_id: i64, predicate: &str, relation: Option<&str>, limit: usize, ) -> Result<Vec<Edge>, MemoryError>

Return all edge versions (active and expired) for the given (source, predicate) pair.

The optional relation filter restricts results to a specific relation label. Results are ordered by valid_from DESC (most recent first).

§Errors

Returns an error if the database query fails.

Source

pub async fn bfs( &self, start_entity_id: i64, max_hops: u32, ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError>

Breadth-first traversal from start_entity_id up to max_hops hops.

Returns all reachable entities and the active edges connecting them. Implements BFS iteratively in Rust to guarantee cycle safety regardless of SQLite CTE limitations.

SQLite bind parameter limit: each BFS hop binds the frontier IDs three times in the neighbour query. At ~300+ frontier entities per hop, the IN clause may approach SQLite’s default SQLITE_MAX_VARIABLE_NUMBER limit of 999. Acceptable for Phase 1 (small graphs, max_hops typically 2–3). For large graphs, consider batching or a temp-table approach.

§Errors

Returns an error if any database query fails.

Source

pub async fn bfs_with_depth( &self, start_entity_id: i64, max_hops: u32, ) -> Result<(Vec<Entity>, Vec<Edge>, HashMap<i64, u32>), MemoryError>

BFS traversal returning entities, edges, and a depth map (entity_id → hop distance).

The depth map records the minimum hop distance from start_entity_id to each visited entity. The start entity itself has depth 0.

SQLite bind parameter limit: see [bfs] for notes on frontier size limits.

§Errors

Returns an error if any database query fails.

Source

pub async fn bfs_at_timestamp( &self, start_entity_id: i64, max_hops: u32, timestamp: &str, ) -> Result<(Vec<Entity>, Vec<Edge>, HashMap<i64, u32>), MemoryError>

BFS traversal considering only edges that were valid at timestamp.

Equivalent to [bfs_with_depth] but replaces the valid_to IS NULL filter with the temporal range predicate valid_from <= ts AND (valid_to IS NULL OR valid_to > ts).

timestamp must be a SQLite datetime string: "YYYY-MM-DD HH:MM:SS".

§Errors

Returns an error if any database query fails.

Source

pub async fn bfs_typed( &self, start_entity_id: i64, max_hops: u32, edge_types: &[EdgeType], ) -> Result<(Vec<Entity>, Vec<Edge>, HashMap<i64, u32>), MemoryError>

BFS traversal scoped to specific MAGMA edge types.

When edge_types is empty, behaves identically to [bfs_with_depth] (traverses all active edges). When edge_types is non-empty, only traverses edges whose edge_type matches one of the provided types.

This enables subgraph-scoped retrieval: a causal query traverses only causal + semantic edges, a temporal query only temporal + semantic edges, etc.

Note: Semantic is typically included in edge_types by the caller to ensure recall is never worse than the untyped BFS. See classify_graph_subgraph in router.rs.

§Errors

Returns an error if any database query fails.

Source

pub async fn find_entity_by_name( &self, name: &str, ) -> Result<Vec<Entity>, MemoryError>

Find an entity by name only (no type filter).

Uses a two-phase lookup to ensure exact name matches are always prioritised:

  1. Exact case-insensitive match on name or canonical_name.
  2. If no exact match found, falls back to FTS5 prefix search (see find_entities_fuzzy).

This prevents FTS5 from returning a different entity whose summary mentions the searched name (e.g. searching “Alice” returning “Google” because Google’s summary contains “Alice”).

§Errors

Returns an error if the database query fails.

Source

pub async fn unprocessed_messages_for_backfill( &self, limit: usize, ) -> Result<Vec<(MessageId, String)>, MemoryError>

Return up to limit messages that have not yet been processed by graph extraction.

Reads the graph_processed column added by migration 021.

§Errors

Returns an error if the database query fails.

Source

pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError>

Return the count of messages not yet processed by graph extraction.

§Errors

Returns an error if the database query fails.

Source

pub async fn mark_messages_graph_processed( &self, ids: &[MessageId], ) -> Result<(), MemoryError>

Mark a batch of messages as graph-processed.

§Errors

Returns an error if the database query fails.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more