Skip to main content

ConversationMemorySystem

Struct ConversationMemorySystem 

Source
pub struct ConversationMemorySystem {
Show 13 fields pub session_manager: SessionManager, pub context_processor: IncrementalContextProcessor, pub graph_manager: SimpleGraphManager, pub workspace_manager: Arc<WorkspaceManager>, pub storage_actor: StorageActorHandle, pub vector_storage: Arc<dyn VectorStorage>, pub config: SystemConfig, pub performance_monitor: Arc<PerformanceMonitor>, pub circuit_breaker: Arc<CircuitBreaker>, pub system_metrics: Arc<SystemMetrics>, pub content_vectorizer: Arc<OnceCell<Arc<ContentVectorizer>>>, pub semantic_query_engine: Arc<OnceCell<Arc<SemanticQueryEngine>>>, pub embedding_config_holder: Arc<EmbeddingConfigHolder>,
}
Expand description

Conversation memory system using actors and channels

Fields§

§session_manager: SessionManager

Session lifecycle manager with cache

§context_processor: IncrementalContextProcessor

Incremental context processing metrics

§graph_manager: SimpleGraphManager

Knowledge graph metric container

§workspace_manager: Arc<WorkspaceManager>

Workspace management coordinator

§storage_actor: StorageActorHandle

Handle to the async storage actor

§vector_storage: Arc<dyn VectorStorage>

Vector storage backend for semantic search

§config: SystemConfig

System configuration

§performance_monitor: Arc<PerformanceMonitor>

Performance metrics monitor

§circuit_breaker: Arc<CircuitBreaker>

Circuit breaker for fault tolerance

§system_metrics: Arc<SystemMetrics>

Global system metrics

§content_vectorizer: Arc<OnceCell<Arc<ContentVectorizer>>>

Lazily initialized content vectorizer

§semantic_query_engine: Arc<OnceCell<Arc<SemanticQueryEngine>>>

Lazily initialized semantic query engine

§embedding_config_holder: Arc<EmbeddingConfigHolder>

Configuration holder used for lazy embedding initialization

Implementations§

Source§

impl ConversationMemorySystem

Source

pub async fn spawn_background_vectorization( &self, session_id: Uuid, session_arc: Arc<ArcSwapAny<Arc<ActiveSession>>>, )

Vectorize the latest update in the background. Fire-and-forget; failures are logged. Returns immediately after the vectorizer has been initialised — on first call this still waits for the model download (~50 MB for potion-multilingual-128M). The new canonical write path goes through MemoryServiceImpl (its update_context impl), which hands the work to the bounded background crate::pipeline::Pipeline — the pipeline worker runs the same init inside its own task, so callers of update_context never see the model-load cost on the hot path. This legacy method stays as the safety net for direct callers of ConversationMemorySystem::add_incremental_update until they migrate (TODO.md item #4 follow-up, slated for 0.4.0).

Source

pub async fn ensure_semantic_engine_initialized( &self, ) -> Result<Arc<SemanticQueryEngine>, String>

Lazy-initialize semantic query engine on first use

Source

pub async fn vectorize_session(&self, session_id: Uuid) -> Result<usize, String>

Vectorize a session’s content (requires embeddings feature)

Source

pub async fn auto_vectorize_if_enabled( &self, session_id: Uuid, ) -> Result<(), String>

Auto-vectorize only the latest update (incremental vectorization) This is much more efficient than re-vectorizing the entire session Includes retry mechanism for transient failures

Source

pub async fn vectorize_all_sessions( &self, ) -> Result<(usize, usize, usize), String>

Vectorize all sessions in the system with parallel processing Returns total number of vectorized items across all sessions and statistics Uses a semaphore to limit concurrent vectorization tasks

Source

pub async fn semantic_search_global( &self, query: &str, limit: Option<usize>, date_range: Option<(DateTime<Utc>, DateTime<Utc>)>, recency_bias: Option<f32>, ) -> Result<Vec<SemanticSearchResult>, String>

Perform semantic search across all sessions

Source

pub async fn semantic_search_session( &self, session_id: Uuid, query: &str, limit: Option<usize>, date_range: Option<(DateTime<Utc>, DateTime<Utc>)>, recency_bias: Option<f32>, ) -> Result<Vec<SemanticSearchResult>, String>

Perform semantic search within a specific session

Find related content across sessions

Source

pub async fn semantic_search_multisession( &self, session_ids: &[Uuid], query: &str, limit: Option<usize>, date_range: Option<(DateTime<Utc>, DateTime<Utc>)>, recency_bias: Option<f32>, ) -> Result<Vec<SemanticSearchResult>, String>

Perform semantic search across multiple sessions

Source

pub fn get_vectorization_stats(&self) -> Result<HashMap<String, usize>, String>

Get vectorization statistics

Source

pub fn embeddings_enabled(&self) -> bool

Check if embeddings are enabled and initialized

Source

pub async fn enable_embeddings_config(&mut self) -> Result<(), String>

Enable embeddings at runtime (requires restart to initialize components)

Source

pub async fn set_embedding_model( &mut self, model_type: String, ) -> Result<(), String>

Configure embedding model type

Source

pub async fn invalidate_and_rebuild_entity_graph( &self, session_id: Uuid, file_path: &str, ) -> Result<(u32, usize), String>

Invalidate a source file and rebuild the entity graph for the given session.

  1. Removes SourceReference entries for the file (storage layer)
  2. Removes incremental updates referencing the file from the session
  3. Rebuilds entity graph from remaining updates
  4. Persists the updated session

Returns (entries_invalidated, entities_after_rebuild).

Source

pub async fn clear_query_cache(&self) -> Result<(), String>

Clear query cache to prevent stale vector IDs after restart

This should be called on daemon startup to ensure cached query results don’t reference vector IDs from before the restart, which would cause incorrect similarity calculations.

Source§

impl ConversationMemorySystem

Source

pub async fn new( config: SystemConfig, ) -> Result<ConversationMemorySystem, String>

Create system from config

§Errors

Returns an error if storage initialization fails or if system setup encounters any issues

Source

pub async fn new_with_storage( storage: RealRocksDBStorage, config: SystemConfig, ) -> Result<ConversationMemorySystem, String>

Create system with RocksDB storage (backward compatibility)

Source

pub async fn create_session( &self, name: Option<String>, description: Option<String>, ) -> Result<Uuid, String>

Create a new session with name and description

Source

pub async fn get_session( &self, session_id: Uuid, ) -> Result<Arc<ArcSwapAny<Arc<ActiveSession>>>, String>

Get existing session by ID

Source

pub fn get_storage(&self) -> &StorageActorHandle

Get storage actor handle for compatibility (replaces storage.read().await)

Source

pub async fn update_session_metadata( &self, session_id: Uuid, name: Option<String>, description: Option<String>, ) -> Result<(), String>

Update session metadata compatibility method

Source

pub async fn delete_session(&self, session_id: Uuid) -> Result<bool, String>

Delete a session: evicts both in-memory caches before tearing down the persisted row. Without the cache evict step, get_session (used by session action=load) would still hand out the cached ActiveSession after the underlying storage row was already gone.

Source

pub async fn delete_entity( &self, session_id: Uuid, entity_name: &str, ) -> Result<bool, String>

Delete a single entity from a session: removes the node + incident edges from the in-memory graph (so cached writes don’t resurrect it) and cascades the same delete to storage (entity row + all 8 edge tables). Returns whether the entity existed.

Source

pub async fn delete_update( &self, session_id: Uuid, entry_id: Uuid, ) -> Result<bool, String>

Delete a single incremental update (context entry) by its entry_id.

Used by MCP manage_entity action=delete_update to clean up ghost/bad rows produced when a client posts content the dispatcher can’t map to a title. Persists the updated session via the storage actor. Returns true if the entry existed.

Source

pub async fn find_sessions_by_name_or_description( &self, query: &str, ) -> Result<Vec<Uuid>, String>

Find sessions by name or description compatibility method

Source

pub async fn list_sessions(&self) -> Result<Vec<Uuid>, String>

List all sessions

Source

pub async fn apply_entity_graph_update_now( &self, session_id: Uuid, update: ContextUpdate, ) -> Result<(), String>

Apply an entity-graph update for a session now (no spawn).

Encapsulates the load → mutate → CAS-swap → persist pattern that add_incremental_update_internal previously inlined inside a tokio::spawn. Exposed publicly so the non-blocking crate::pipeline workers can drive entity-graph maintenance off the hot write path.

Idempotent w.r.t. concurrent updates: a failed CAS-swap is treated as “another writer won” and silently dropped (the next update catches up). Errors propagate so the caller — typically a pipeline worker — can log them and continue.

Source

pub async fn vectorize_latest_update_now( &self, session_id: Uuid, ) -> Result<usize, String>

Vectorize the latest update for a session now (no spawn).

Equivalent body of Self::spawn_background_vectorization without the surrounding tokio::spawn — lets the crate::pipeline::EmbeddingQueue worker run vectorization from inside its own task while the write path stays in single-digit ms.

Source

pub async fn add_incremental_update( &self, session_id: Uuid, description: String, metadata: Option<Value>, ) -> Result<String, String>

Add incremental update - compatibility wrapper

Source

pub async fn get_conversation_context( &self, session_id: Uuid, ) -> Result<String, String>

Get conversation context

Source

pub fn get_system_health(&self) -> SystemHealth

Get system health - all atomic reads

Source

pub async fn cleanup_expired_sessions(&self)

Cleanup expired sessions - background task Also enforces memory limits on active_sessions DashMap

Source

pub async fn force_memory_cleanup(&self, target_size: usize)

Force cleanup to reduce memory usage immediately Useful when the system is under memory pressure

Source

pub async fn save_workspace_metadata( &self, workspace_id: Uuid, name: &str, description: &str, session_ids: &[Uuid], ) -> Result<(), String>

Save workspace metadata (proxy to storage actor)

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<U> As for U

Source§

fn as_<T>(self) -> T
where T: CastFrom<U>,

Casts self to type T. The semantics of numeric casting with the as operator are followed, so <T as As>::as_::<U> can be used in the same way as T as U for numeric conversions. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<G1, G2> Within<G2> for G1
where G2: Contains<G1>,

Source§

fn is_within(&self, b: &G2) -> bool

Source§

impl<G1, G2> Within<G2> for G1
where G2: Contains<G1>,

Source§

fn is_within(&self, b: &G2) -> bool

Source§

impl<T> ErasedDestructor for T
where T: 'static,