Skip to main content

EngineHandle

Struct EngineHandle 

Source
pub struct EngineHandle<P, O, V>
where P: PersistencePort + Send + Sync + 'static, O: OraclePort + Send + Sync + 'static, V: VectorPort + Send + Sync + 'static,
{ /* private fields */ }
Expand description

The sole public async entry point for mempill.

Callers: mempill-py, mempill-node, mempill-mcp, integration tests. Cloneable: all fields are Arc-wrapped; clones share the same lock map and port state.

Implementations§

Source§

impl<P, O, V> EngineHandle<P, O, V>
where P: PersistencePort + Send + Sync + 'static, O: OraclePort + Send + Sync + 'static, V: VectorPort + Send + Sync + 'static,

Source

pub fn new( persistence: Arc<P>, oracle: Option<Arc<O>>, vector: Option<Arc<V>>, config: EngineConfig, ) -> Self

Create an EngineHandle without a pending-adjudication store.

QueuedForAdjudication claims will still be committed with the correct disposition, but no pending_adjudications row will be written. Suitable for tests that don’t exercise oracle queue persistence, and for the DefaultEngine alias.

Source

pub fn new_with_pending_store<S>( persistence: Arc<P>, oracle: Option<Arc<O>>, vector: Option<Arc<V>>, pending_store: Arc<dyn ErasedPendingStore>, config: EngineConfig, ) -> Self

Create an EngineHandle with a concrete pending-adjudication store.

The store is type-erased via ErasedPendingStoreAdapter so EngineHandle keeps its 3-param signature unchanged.

Typical usage in adapter crates (e.g. mempill-sqlite):

let engine = EngineHandle::new_with_pending_store(
    Arc::new(persistence_store),
    Some(Arc::new(oracle)),
    None::<Arc<NoOpVector>>,
    Arc::new(ErasedPendingStoreAdapter::new(sqlite_pending_store)),
    EngineConfig::default(),
);
Source

pub async fn ingest_claim( &self, req: IngestClaimRequest, ) -> Result<IngestClaimResponse, MemError>

Write path: async, acquires per-agent_id lock, delegates to IngestClaimUseCase.

Clock is read ONCE here (DETERMINISM): now flows into the use-case as a parameter.

Locking order (must be consistent across all write methods to avoid deadlock):

  1. store_write_lock — serializes all cross-agent SQLite writes (conditional; Postgres skips)
  2. per-agent lock — preserves same-agent serial semantics + Postgres compat
Source

pub async fn query_memory( &self, req: QueryMemoryRequest, ) -> Result<QueryMemoryResponse, MemError>

Read path: no write lock needed. Delegates to QueryMemoryUseCase.

Clock read ONCE here; passed into the sync use-case.

Source

pub async fn query_history( &self, req: QueryHistoryRequest, ) -> Result<QueryHistoryResponse, MemError>

History read path: no write lock needed. Delegates to QueryHistoryUseCase.

Returns the full ordered timeline for a (subject, predicate) subject-line. Each entry is tagged Current or Superseded using the same canonical fold as query_memory — so history.current().value == recall primary value.

Clock read ONCE here; passed into the sync use-case (DETERMINISM).

Source

pub async fn reconcile( &self, req: ReconcileRequest, ) -> Result<ReconcileResponse, MemError>

Reconcile path: acquires write lock per agent_id in the request.

Locking order matches ingest_claim: store_write_lock first (conditional), then per-agent lock.

Source

pub async fn query_audit( &self, req: AuditQueryRequest, ) -> Result<AuditQueryResponse, MemError>

Audit read path: no write lock.

Source

pub async fn submit_adjudication( &self, handle_id: Uuid, response: AdjudicationResponse, ) -> Result<AdjudicationOutcome, MemError>

Oracle resolution path: deliver an oracle verdict and apply it atomically.

Acquires locks in the SAME ORDER as ingest_claim to prevent deadlock:

  1. store_write_lock — serializes all cross-agent SQLite writes (conditional).
  2. per-agent lock — keyed on the agent_id retrieved from the pending row.
§Postgres / async-runtime safety

The postgres sync crate (postgres 0.19) wraps tokio-postgres and calls block_on in Client::drop. Dropping a postgres Client while a tokio runtime is active on the current thread panics with “Cannot start a runtime from within a runtime”.

ALL pending-store I/O (including the agent_id resolution read) is therefore performed inside spawn_blocking so no postgres::Client is ever created or dropped on the async executor thread. This is the same discipline used by ingest_claim.

§Protocol
  1. spawn_blocking — resolve agent_id from the pending row (DB read, safe).
  2. Acquire store_write_lock (SQLite-only) + per-agent write lock (async).
  3. spawn_blocking — run SubmitAdjudicationUseCase::execute (all DB writes).
§Errors
  • MemError::AdjudicationHandleNotFound — handle unknown, expired, or stale.
  • MemError::PendingStore — pending-store I/O error.
  • MemError::Persistence — DB write error during verdict apply.
  • MemError::SpawnBlocking — tokio task join error.
Source

pub async fn list_pending_adjudications( &self, agent_id: Option<AgentId>, ) -> Result<Vec<PendingAdjudicationRow>, MemError>

Read path: list all pending-adjudication rows for an agent (or all agents).

This is a read-only operation — no write lock is acquired. All DB access is performed inside spawn_blocking so no postgres::Client is created or dropped on the async executor thread (same invariant as submit_adjudication).

Returns Ok(vec![]) when no pending store is configured.

Source

pub async fn sweep_expired_adjudications(&self) -> Result<usize, MemError>

Sweep all expired pending-adjudication rows and orphaned QueuedForAdjudication claims.

For each expired pending row (expires_at <= now):

  1. Acquires store_write_lock + per-agent write lock (same order as ingest_claim).
  2. Atomically reverts the challenger QueuedForAdjudication → Contested + ledger entry.
  3. Marks the pending row expired.

Then sweeps orphan claims (QueuedForAdjudication with no pending row): 4. Per orphan: acquires locks, reverts challenger → Contested + ledger entry.

Returns the total count of claims reverted (expired + orphan).

The engine MUST NOT spawn a background task — the host calls this on its own schedule.

If no pending store is configured, returns Ok(0) (sweep is a no-op without oracle queue).

§Postgres / async-runtime safety

ALL pending-store reads (list_expired, list_queued_orphan_claims) are performed inside spawn_blocking so no postgres::Client is created or dropped on the tokio executor thread (same invariant as submit_adjudication).

Trait Implementations§

Source§

impl<P, O, V> Clone for EngineHandle<P, O, V>
where P: PersistencePort + Send + Sync + 'static, O: OraclePort + Send + Sync + 'static, V: VectorPort + Send + Sync + 'static,

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl<P, O, V> !RefUnwindSafe for EngineHandle<P, O, V>

§

impl<P, O, V> !UnwindSafe for EngineHandle<P, O, V>

§

impl<P, O, V> Freeze for EngineHandle<P, O, V>

§

impl<P, O, V> Send for EngineHandle<P, O, V>

§

impl<P, O, V> Sync for EngineHandle<P, O, V>

§

impl<P, O, V> Unpin for EngineHandle<P, O, V>

§

impl<P, O, V> UnsafeUnpin for EngineHandle<P, O, V>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.