pub struct EngineHandle<P, O, V>where
P: PersistencePort + Send + Sync + 'static,
O: OraclePort + Send + Sync + 'static,
V: VectorPort + Send + Sync + 'static,{ /* private fields */ }Expand description
The sole public async entry point for mempill.
Callers: mempill-py, mempill-node, mempill-mcp, integration tests.
Cloneable: all fields are Arc-wrapped; clones share the same lock map and port state.
Implementations§
Source§impl<P, O, V> EngineHandle<P, O, V>where
P: PersistencePort + Send + Sync + 'static,
O: OraclePort + Send + Sync + 'static,
V: VectorPort + Send + Sync + 'static,
impl<P, O, V> EngineHandle<P, O, V>where
P: PersistencePort + Send + Sync + 'static,
O: OraclePort + Send + Sync + 'static,
V: VectorPort + Send + Sync + 'static,
Sourcepub fn new(
persistence: Arc<P>,
oracle: Option<Arc<O>>,
vector: Option<Arc<V>>,
config: EngineConfig,
) -> Self
pub fn new( persistence: Arc<P>, oracle: Option<Arc<O>>, vector: Option<Arc<V>>, config: EngineConfig, ) -> Self
Create an EngineHandle without a pending-adjudication store.
QueuedForAdjudication claims will still be committed with the correct disposition,
but no pending_adjudications row will be written. Suitable for tests that don’t
exercise oracle queue persistence, and for the DefaultEngine alias.
Sourcepub fn new_with_pending_store<S>(
persistence: Arc<P>,
oracle: Option<Arc<O>>,
vector: Option<Arc<V>>,
pending_store: Arc<dyn ErasedPendingStore>,
config: EngineConfig,
) -> Self
pub fn new_with_pending_store<S>( persistence: Arc<P>, oracle: Option<Arc<O>>, vector: Option<Arc<V>>, pending_store: Arc<dyn ErasedPendingStore>, config: EngineConfig, ) -> Self
Create an EngineHandle with a concrete pending-adjudication store.
The store is type-erased via ErasedPendingStoreAdapter so EngineHandle keeps
its 3-param signature unchanged.
Typical usage in adapter crates (e.g. mempill-sqlite):
let engine = EngineHandle::new_with_pending_store(
Arc::new(persistence_store),
Some(Arc::new(oracle)),
None::<Arc<NoOpVector>>,
Arc::new(ErasedPendingStoreAdapter::new(sqlite_pending_store)),
EngineConfig::default(),
);Sourcepub async fn ingest_claim(
&self,
req: IngestClaimRequest,
) -> Result<IngestClaimResponse, MemError>
pub async fn ingest_claim( &self, req: IngestClaimRequest, ) -> Result<IngestClaimResponse, MemError>
Write path: async, acquires per-agent_id lock, delegates to IngestClaimUseCase.
Clock is read ONCE here (DETERMINISM): now flows into the use-case as a parameter.
Locking order (must be consistent across all write methods to avoid deadlock):
- store_write_lock — serializes all cross-agent SQLite writes (conditional; Postgres skips)
- per-agent lock — preserves same-agent serial semantics + Postgres compat
Sourcepub async fn query_memory(
&self,
req: QueryMemoryRequest,
) -> Result<QueryMemoryResponse, MemError>
pub async fn query_memory( &self, req: QueryMemoryRequest, ) -> Result<QueryMemoryResponse, MemError>
Read path: no write lock needed. Delegates to QueryMemoryUseCase.
Clock read ONCE here; passed into the sync use-case.
Sourcepub async fn query_history(
&self,
req: QueryHistoryRequest,
) -> Result<QueryHistoryResponse, MemError>
pub async fn query_history( &self, req: QueryHistoryRequest, ) -> Result<QueryHistoryResponse, MemError>
History read path: no write lock needed. Delegates to QueryHistoryUseCase.
Returns the full ordered timeline for a (subject, predicate) subject-line.
Each entry is tagged Current or Superseded using the same canonical fold
as query_memory — so history.current().value == recall primary value.
Clock read ONCE here; passed into the sync use-case (DETERMINISM).
Sourcepub async fn reconcile(
&self,
req: ReconcileRequest,
) -> Result<ReconcileResponse, MemError>
pub async fn reconcile( &self, req: ReconcileRequest, ) -> Result<ReconcileResponse, MemError>
Reconcile path: acquires write lock per agent_id in the request.
Locking order matches ingest_claim: store_write_lock first (conditional), then per-agent lock.
Sourcepub async fn query_audit(
&self,
req: AuditQueryRequest,
) -> Result<AuditQueryResponse, MemError>
pub async fn query_audit( &self, req: AuditQueryRequest, ) -> Result<AuditQueryResponse, MemError>
Audit read path: no write lock.
Sourcepub async fn submit_adjudication(
&self,
handle_id: Uuid,
response: AdjudicationResponse,
) -> Result<AdjudicationOutcome, MemError>
pub async fn submit_adjudication( &self, handle_id: Uuid, response: AdjudicationResponse, ) -> Result<AdjudicationOutcome, MemError>
Oracle resolution path: deliver an oracle verdict and apply it atomically.
Acquires locks in the SAME ORDER as ingest_claim to prevent deadlock:
store_write_lock— serializes all cross-agent SQLite writes (conditional).- per-agent lock — keyed on the
agent_idretrieved from the pending row.
§Postgres / async-runtime safety
The postgres sync crate (postgres 0.19) wraps tokio-postgres and calls block_on
in Client::drop. Dropping a postgres Client while a tokio runtime is active on the
current thread panics with “Cannot start a runtime from within a runtime”.
ALL pending-store I/O (including the agent_id resolution read) is therefore performed
inside spawn_blocking so no postgres::Client is ever created or dropped on the
async executor thread. This is the same discipline used by ingest_claim.
§Protocol
spawn_blocking— resolveagent_idfrom the pending row (DB read, safe).- Acquire
store_write_lock(SQLite-only) + per-agent write lock (async). spawn_blocking— runSubmitAdjudicationUseCase::execute(all DB writes).
§Errors
MemError::AdjudicationHandleNotFound— handle unknown, expired, or stale.MemError::PendingStore— pending-store I/O error.MemError::Persistence— DB write error during verdict apply.MemError::SpawnBlocking— tokio task join error.
Sourcepub async fn list_pending_adjudications(
&self,
agent_id: Option<AgentId>,
) -> Result<Vec<PendingAdjudicationRow>, MemError>
pub async fn list_pending_adjudications( &self, agent_id: Option<AgentId>, ) -> Result<Vec<PendingAdjudicationRow>, MemError>
Read path: list all pending-adjudication rows for an agent (or all agents).
This is a read-only operation — no write lock is acquired. All DB access
is performed inside spawn_blocking so no postgres::Client is created or
dropped on the async executor thread (same invariant as submit_adjudication).
Returns Ok(vec![]) when no pending store is configured.
Sourcepub async fn sweep_expired_adjudications(&self) -> Result<usize, MemError>
pub async fn sweep_expired_adjudications(&self) -> Result<usize, MemError>
Sweep all expired pending-adjudication rows and orphaned QueuedForAdjudication claims.
For each expired pending row (expires_at <= now):
- Acquires store_write_lock + per-agent write lock (same order as ingest_claim).
- Atomically reverts the challenger QueuedForAdjudication → Contested + ledger entry.
- Marks the pending row expired.
Then sweeps orphan claims (QueuedForAdjudication with no pending row): 4. Per orphan: acquires locks, reverts challenger → Contested + ledger entry.
Returns the total count of claims reverted (expired + orphan).
The engine MUST NOT spawn a background task — the host calls this on its own schedule.
If no pending store is configured, returns Ok(0) (sweep is a no-op without oracle queue).
§Postgres / async-runtime safety
ALL pending-store reads (list_expired, list_queued_orphan_claims) are performed
inside spawn_blocking so no postgres::Client is created or dropped on the tokio
executor thread (same invariant as submit_adjudication).