Skip to main content

mempill_core/
engine_handle.rs

1//! EngineHandle — the sole public async entry point for mempill.
2//!
3//! Owns `Arc<impl Port>` references plus the per-agent_id write lock map.
4//! Every public method:
5//!   1. Reads the clock ONCE at the async boundary (`now = Utc::now()`).
6//!   2. Acquires the per-agent_id write lock for write operations.
7//!   3. Delegates to the sync use-case via `tokio::task::spawn_blocking`.
8//!   4. Maps `JoinError` → `MemError::SpawnBlocking`.
9//!
10//! The use-case layer is fully synchronous — no async code below this file.
11//!
12//! # Pending-adjudication port
13//!
14//! `EngineHandle` carries an optional `Arc<dyn ErasedPendingStore>` for the oracle queue.
15//! Use `EngineHandle::new` for the standard case (no pending store) and
16//! `EngineHandle::new_with_pending_store` when wiring in a concrete adapter.
17//! The type-erasure lets `EngineHandle<P, O, V>` keep its existing 3-param signature.
18
19use std::sync::Arc;
20
21use chrono::Utc;
22use tokio::task;
23
24use crate::{
25    application::{
26        audit::AuditUseCase,
27        dto::{
28            AuditQueryRequest, AuditQueryResponse, IngestClaimRequest, IngestClaimResponse,
29            QueryHistoryRequest, QueryHistoryResponse, QueryMemoryRequest, QueryMemoryResponse,
30            ReconcileRequest, ReconcileResponse,
31        },
32        ingest_claim::IngestClaimUseCase,
33        query_history::QueryHistoryUseCase,
34        query_memory::QueryMemoryUseCase,
35        reconcile::ReconcileUseCase,
36        submit_adjudication::SubmitAdjudicationUseCase,
37        sweep_adjudications::SweepAdjudicationsUseCase,
38    },
39    concurrency::agent_lock::AgentWriteLockMap,
40    config::EngineConfig,
41    error::MemError,
42    ports::{OraclePort, PendingAdjudicationPort, PersistencePort, VectorPort},
43};
44
45// ── Type-erased pending store ─────────────────────────────────────────────────
46//
47// `PendingAdjudicationPort` is NOT object-safe in its generic form because `Self::Error`
48// is an associated type. We introduce a thin object-safe erasing wrapper that boxes errors.
49
50/// Object-safe erasing wrapper for `PendingAdjudicationPort`.
51///
52/// Adapters implement `PendingAdjudicationPort`; this wrapper is created via
53/// `ErasedPendingStoreAdapter::new(concrete_store)` and stored as `Arc<dyn ErasedPendingStore>`.
54#[allow(missing_docs)]
55pub trait ErasedPendingStore: Send + Sync + 'static {
56    fn insert_pending_erased(
57        &self,
58        row: &crate::ports::pending_adjudication::PendingAdjudicationRow,
59    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;
60
61    fn get_pending_erased(
62        &self,
63        handle_id: uuid::Uuid,
64    ) -> Result<Option<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>>;
65
66    fn list_pending_erased(
67        &self,
68        agent_id: Option<&mempill_types::AgentId>,
69    ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>>;
70
71    fn list_expired_erased(
72        &self,
73        now: chrono::DateTime<chrono::Utc>,
74    ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>>;
75
76    fn mark_resolved_erased(
77        &self,
78        handle_id: uuid::Uuid,
79    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;
80
81    fn mark_expired_erased(
82        &self,
83        handle_id: uuid::Uuid,
84    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;
85
86    fn list_queued_orphan_claims_erased(
87        &self,
88    ) -> Result<Vec<crate::ports::pending_adjudication::OrphanedQueuedClaim>, Box<dyn std::error::Error + Send + Sync + 'static>>;
89}
90
91/// Adapter that wraps a concrete `PendingAdjudicationPort` impl as `dyn ErasedPendingStore`.
92pub struct ErasedPendingStoreAdapter<S: PendingAdjudicationPort> {
93    inner: S,
94}
95
96impl<S: PendingAdjudicationPort> ErasedPendingStoreAdapter<S> {
97    /// Wrap a concrete `PendingAdjudicationPort` impl as `dyn ErasedPendingStore`.
98    pub fn new(inner: S) -> Self {
99        Self { inner }
100    }
101}
102
103impl<S: PendingAdjudicationPort> ErasedPendingStore for ErasedPendingStoreAdapter<S> {
104    fn insert_pending_erased(
105        &self,
106        row: &crate::ports::pending_adjudication::PendingAdjudicationRow,
107    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
108        self.inner.insert_pending(row).map_err(|e| Box::new(e) as _)
109    }
110
111    fn get_pending_erased(
112        &self,
113        handle_id: uuid::Uuid,
114    ) -> Result<Option<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>> {
115        self.inner.get_pending(handle_id).map_err(|e| Box::new(e) as _)
116    }
117
118    fn list_pending_erased(
119        &self,
120        agent_id: Option<&mempill_types::AgentId>,
121    ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>> {
122        self.inner.list_pending(agent_id).map_err(|e| Box::new(e) as _)
123    }
124
125    fn list_expired_erased(
126        &self,
127        now: chrono::DateTime<chrono::Utc>,
128    ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>> {
129        self.inner.list_expired(now).map_err(|e| Box::new(e) as _)
130    }
131
132    fn mark_resolved_erased(
133        &self,
134        handle_id: uuid::Uuid,
135    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
136        self.inner.mark_resolved(handle_id).map_err(|e| Box::new(e) as _)
137    }
138
139    fn mark_expired_erased(
140        &self,
141        handle_id: uuid::Uuid,
142    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
143        self.inner.mark_expired(handle_id).map_err(|e| Box::new(e) as _)
144    }
145
146    fn list_queued_orphan_claims_erased(
147        &self,
148    ) -> Result<Vec<crate::ports::pending_adjudication::OrphanedQueuedClaim>, Box<dyn std::error::Error + Send + Sync + 'static>> {
149        self.inner.list_queued_orphan_claims().map_err(|e| Box::new(e) as _)
150    }
151}
152
153// ── EngineHandle ──────────────────────────────────────────────────────────────
154
155/// The sole public async entry point for mempill.
156///
157/// Callers: mempill-py, mempill-node, mempill-mcp, integration tests.
158/// Cloneable: all fields are `Arc`-wrapped; clones share the same lock map and port state.
159pub struct EngineHandle<P, O, V>
160where
161    P: PersistencePort + Send + Sync + 'static,
162    O: OraclePort + Send + Sync + 'static,
163    V: VectorPort + Send + Sync + 'static,
164{
165    persistence: Arc<P>,
166    oracle: Option<Arc<O>>,
167    vector: Option<Arc<V>>,
168    /// Type-erased pending-adjudication store. `None` when no oracle queue is configured.
169    pending_store: Option<Arc<dyn ErasedPendingStore>>,
170    config: EngineConfig,
171    write_locks: AgentWriteLockMap,
172    /// Store-level write lock: serializes ALL writes across agent_ids to prevent
173    /// concurrent SQLite transactions from different agents on the same connection.
174    /// Reads (query_memory, query_audit) never acquire this lock.
175    store_write_lock: Arc<tokio::sync::Mutex<()>>,
176}
177
178impl<P, O, V> EngineHandle<P, O, V>
179where
180    P: PersistencePort + Send + Sync + 'static,
181    O: OraclePort + Send + Sync + 'static,
182    V: VectorPort + Send + Sync + 'static,
183{
184    /// Create an `EngineHandle` without a pending-adjudication store.
185    ///
186    /// QueuedForAdjudication claims will still be committed with the correct disposition,
187    /// but no `pending_adjudications` row will be written. Suitable for tests that don't
188    /// exercise oracle queue persistence, and for the `DefaultEngine` alias.
189    pub fn new(
190        persistence: Arc<P>,
191        oracle: Option<Arc<O>>,
192        vector: Option<Arc<V>>,
193        config: EngineConfig,
194    ) -> Self {
195        Self {
196            persistence,
197            oracle,
198            vector,
199            pending_store: None,
200            config,
201            write_locks: AgentWriteLockMap::new(),
202            store_write_lock: Arc::new(tokio::sync::Mutex::new(())),
203        }
204    }
205
206    /// Create an `EngineHandle` with a concrete pending-adjudication store.
207    ///
208    /// The store is type-erased via [`ErasedPendingStoreAdapter`] so `EngineHandle` keeps
209    /// its 3-param signature unchanged.
210    ///
211    /// Typical usage in adapter crates (e.g. mempill-sqlite):
212    /// ```rust,ignore
213    /// let engine = EngineHandle::new_with_pending_store(
214    ///     Arc::new(persistence_store),
215    ///     Some(Arc::new(oracle)),
216    ///     None::<Arc<NoOpVector>>,
217    ///     Arc::new(ErasedPendingStoreAdapter::new(sqlite_pending_store)),
218    ///     EngineConfig::default(),
219    /// );
220    /// ```
221    pub fn new_with_pending_store<S>(
222        persistence: Arc<P>,
223        oracle: Option<Arc<O>>,
224        vector: Option<Arc<V>>,
225        pending_store: Arc<dyn ErasedPendingStore>,
226        config: EngineConfig,
227    ) -> Self {
228        Self {
229            persistence,
230            oracle,
231            vector,
232            pending_store: Some(pending_store),
233            config,
234            write_locks: AgentWriteLockMap::new(),
235            store_write_lock: Arc::new(tokio::sync::Mutex::new(())),
236        }
237    }
238
239    /// Write path: async, acquires per-agent_id lock, delegates to IngestClaimUseCase.
240    ///
241    /// Clock is read ONCE here (DETERMINISM): `now` flows into the use-case as a parameter.
242    ///
243    /// Locking order (must be consistent across all write methods to avoid deadlock):
244    ///   1. store_write_lock  — serializes all cross-agent SQLite writes (conditional; Postgres skips)
245    ///   2. per-agent lock    — preserves same-agent serial semantics + Postgres compat
246    pub async fn ingest_claim(
247        &self,
248        req: IngestClaimRequest,
249    ) -> Result<IngestClaimResponse, MemError> {
250        let now = Utc::now(); // clock read ONCE at the async boundary
251        // Acquire global write lock only when the adapter requires it (SQLite=yes, Postgres=no).
252        let _store_lock = if self.persistence.requires_global_write_serialization() {
253            Some(self.store_write_lock.lock().await)
254        } else {
255            None
256        };
257        let _guard = self.write_locks.acquire(&req.agent_id).await;
258        let uc = IngestClaimUseCase::new(
259            Arc::clone(&self.persistence),
260            self.oracle.clone(),
261            self.pending_store.clone(),
262            self.config.clone(),
263        );
264        task::spawn_blocking(move || uc.execute_with_time(req, now))
265            .await
266            .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
267    }
268
269    /// Read path: no write lock needed. Delegates to QueryMemoryUseCase.
270    ///
271    /// Clock read ONCE here; passed into the sync use-case.
272    pub async fn query_memory(
273        &self,
274        req: QueryMemoryRequest,
275    ) -> Result<QueryMemoryResponse, MemError> {
276        let now = Utc::now();
277        let uc = QueryMemoryUseCase::new(
278            Arc::clone(&self.persistence),
279            self.vector.clone(),
280            self.config.clone(),
281        );
282        task::spawn_blocking(move || uc.execute_with_time(req, now))
283            .await
284            .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
285    }
286
287    /// History read path: no write lock needed. Delegates to QueryHistoryUseCase.
288    ///
289    /// Returns the full ordered timeline for a (subject, predicate) subject-line.
290    /// Each entry is tagged `Current` or `Superseded` using the same canonical fold
291    /// as `query_memory` — so `history.current().value == recall primary value`.
292    ///
293    /// Clock read ONCE here; passed into the sync use-case (DETERMINISM).
294    pub async fn query_history(
295        &self,
296        req: QueryHistoryRequest,
297    ) -> Result<QueryHistoryResponse, MemError> {
298        let now = Utc::now();
299        let uc = QueryHistoryUseCase::new(
300            Arc::clone(&self.persistence),
301            self.vector.clone(),
302            self.config.clone(),
303        );
304        task::spawn_blocking(move || uc.execute_with_time(req, now))
305            .await
306            .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
307    }
308
309    /// Reconcile path: acquires write lock per agent_id in the request.
310    ///
311    /// Locking order matches ingest_claim: store_write_lock first (conditional), then per-agent lock.
312    pub async fn reconcile(
313        &self,
314        req: ReconcileRequest,
315    ) -> Result<ReconcileResponse, MemError> {
316        // Acquire global write lock only when the adapter requires it (SQLite=yes, Postgres=no).
317        let _store_lock = if self.persistence.requires_global_write_serialization() {
318            Some(self.store_write_lock.lock().await)
319        } else {
320            None
321        };
322        let _guard = self.write_locks.acquire(&req.agent_id).await;
323        let uc = ReconcileUseCase::new(
324            Arc::clone(&self.persistence),
325            self.oracle.clone(),
326            self.config.clone(),
327        );
328        task::spawn_blocking(move || uc.execute(req))
329            .await
330            .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
331    }
332
333    /// Audit read path: no write lock.
334    pub async fn query_audit(
335        &self,
336        req: AuditQueryRequest,
337    ) -> Result<AuditQueryResponse, MemError> {
338        let uc = AuditUseCase::new(Arc::clone(&self.persistence));
339        task::spawn_blocking(move || uc.execute(req))
340            .await
341            .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
342    }
343
344    /// Oracle resolution path: deliver an oracle verdict and apply it atomically.
345    ///
346    /// Acquires locks in the SAME ORDER as `ingest_claim` to prevent deadlock:
347    ///   1. `store_write_lock`  — serializes all cross-agent SQLite writes (conditional).
348    ///   2. per-agent lock      — keyed on the `agent_id` retrieved from the pending row.
349    ///
350    /// # Postgres / async-runtime safety
351    ///
352    /// The postgres sync crate (`postgres 0.19`) wraps `tokio-postgres` and calls `block_on`
353    /// in `Client::drop`. Dropping a postgres `Client` while a tokio runtime is active on the
354    /// current thread panics with "Cannot start a runtime from within a runtime".
355    ///
356    /// ALL pending-store I/O (including the agent_id resolution read) is therefore performed
357    /// inside `spawn_blocking` so no `postgres::Client` is ever created or dropped on the
358    /// async executor thread. This is the same discipline used by `ingest_claim`.
359    ///
360    /// # Protocol
361    ///
362    /// 1. `spawn_blocking` — resolve `agent_id` from the pending row (DB read, safe).
363    /// 2. Acquire `store_write_lock` (SQLite-only) + per-agent write lock (async).
364    /// 3. `spawn_blocking` — run `SubmitAdjudicationUseCase::execute` (all DB writes).
365    ///
366    /// # Errors
367    ///
368    /// - `MemError::AdjudicationHandleNotFound` — handle unknown, expired, or stale.
369    /// - `MemError::PendingStore` — pending-store I/O error.
370    /// - `MemError::Persistence` — DB write error during verdict apply.
371    /// - `MemError::SpawnBlocking` — tokio task join error.
372    pub async fn submit_adjudication(
373        &self,
374        handle_id: uuid::Uuid,
375        response: mempill_types::AdjudicationResponse,
376    ) -> Result<mempill_types::AdjudicationOutcome, MemError> {
377        let now = Utc::now(); // clock read ONCE at the async boundary (DETERMINISM)
378
379        // ── Step 1: Resolve agent_id via spawn_blocking (NO async-context DB access) ──
380        //
381        // The postgres sync crate calls `block_on` in `Client::drop`. Reading the pending
382        // store directly in the async context would drop a postgres connection on the tokio
383        // thread, causing a panic. `spawn_blocking` moves the drop to a dedicated OS thread
384        // where no tokio runtime is active. The use-case re-reads the row inside its own
385        // spawn_blocking (Step 3) for the authoritative state-guard.
386        let pending_store = self.pending_store.as_ref()
387            .ok_or(MemError::AdjudicationHandleNotFound { handle_id })?;
388        let pending_store_arc = Arc::clone(pending_store);
389
390        let resolve_result = task::spawn_blocking(move || {
391            let row = pending_store_arc
392                .get_pending_erased(handle_id)
393                .map_err(|e| MemError::PendingStore { source: e })?
394                .ok_or(MemError::AdjudicationHandleNotFound { handle_id })?;
395            Ok::<_, MemError>(row)
396        })
397        .await
398        .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
399
400        let row = resolve_result;
401
402        // Note: TTL expiry is handled authoritatively inside SubmitAdjudicationUseCase
403        // (which also writes the AdjudicationExpired ledger entry). Do NOT early-reject
404        // here — the use-case must run so the audit trail is complete.
405        let agent_id = row.agent_id.clone();
406
407        // ── Step 2: Acquire locks in the same order as ingest_claim ──────────────
408        let _store_lock = if self.persistence.requires_global_write_serialization() {
409            Some(self.store_write_lock.lock().await)
410        } else {
411            None
412        };
413        let _guard = self.write_locks.acquire(&agent_id).await;
414
415        // ── Step 3: Dispatch to sync use-case via spawn_blocking ─────────────────
416        let pending_store_arc2 = Arc::clone(pending_store);
417        let uc = SubmitAdjudicationUseCase::new(
418            Arc::clone(&self.persistence),
419            pending_store_arc2,
420        );
421        task::spawn_blocking(move || uc.execute(handle_id, response, now))
422            .await
423            .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
424    }
425
426    /// Read path: list all pending-adjudication rows for an agent (or all agents).
427    ///
428    /// This is a read-only operation — no write lock is acquired.  All DB access
429    /// is performed inside `spawn_blocking` so no `postgres::Client` is created or
430    /// dropped on the async executor thread (same invariant as `submit_adjudication`).
431    ///
432    /// Returns `Ok(vec![])` when no pending store is configured.
433    pub async fn list_pending_adjudications(
434        &self,
435        agent_id: Option<mempill_types::AgentId>,
436    ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, MemError> {
437        let pending_store = match &self.pending_store {
438            Some(ps) => Arc::clone(ps),
439            None => return Ok(vec![]),
440        };
441
442        task::spawn_blocking(move || {
443            pending_store
444                .list_pending_erased(agent_id.as_ref())
445                .map_err(|e| MemError::PendingStore { source: e })
446        })
447        .await
448        .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
449    }
450
451    /// Sweep all expired pending-adjudication rows and orphaned QueuedForAdjudication claims.
452    ///
453    /// For each expired pending row (expires_at <= now):
454    ///   1. Acquires store_write_lock + per-agent write lock (same order as ingest_claim).
455    ///   2. Atomically reverts the challenger QueuedForAdjudication → Contested + ledger entry.
456    ///   3. Marks the pending row expired.
457    ///
458    /// Then sweeps orphan claims (QueuedForAdjudication with no pending row):
459    ///   4. Per orphan: acquires locks, reverts challenger → Contested + ledger entry.
460    ///
461    /// Returns the total count of claims reverted (expired + orphan).
462    ///
463    /// The engine MUST NOT spawn a background task — the host calls this on its own schedule.
464    ///
465    /// If no pending store is configured, returns `Ok(0)` (sweep is a no-op without oracle queue).
466    ///
467    /// # Postgres / async-runtime safety
468    ///
469    /// ALL pending-store reads (`list_expired`, `list_queued_orphan_claims`) are performed
470    /// inside `spawn_blocking` so no `postgres::Client` is created or dropped on the tokio
471    /// executor thread (same invariant as `submit_adjudication`).
472    pub async fn sweep_expired_adjudications(&self) -> Result<usize, MemError> {
473        let now = Utc::now();
474
475        let pending_store = match &self.pending_store {
476            Some(ps) => Arc::clone(ps),
477            None => return Ok(0),
478        };
479
480        // ── Phase 1: Collect expired rows via spawn_blocking (NO async-context DB access) ──
481        let ps_for_list = Arc::clone(&pending_store);
482        let expired_rows = task::spawn_blocking(move || {
483            ps_for_list
484                .list_expired_erased(now)
485                .map_err(|e| MemError::PendingStore { source: e })
486        })
487        .await
488        .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
489
490        let mut swept = 0usize;
491
492        for row in expired_rows {
493            let agent_id = row.agent_id.clone();
494
495            let _store_lock = if self.persistence.requires_global_write_serialization() {
496                Some(self.store_write_lock.lock().await)
497            } else {
498                None
499            };
500            let _guard = self.write_locks.acquire(&agent_id).await;
501
502            let persistence = Arc::clone(&self.persistence);
503            let ps = Arc::clone(&pending_store);
504            let row_clone = row.clone();
505
506            let result = task::spawn_blocking(move || {
507                let uc = SweepAdjudicationsUseCase::new(persistence, ps);
508                uc.revert_expired_row(&row_clone, now)
509            })
510            .await
511            .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
512
513            if result {
514                swept += 1;
515            }
516        }
517
518        // ── Phase 2: Collect orphans via spawn_blocking (NO async-context DB access) ──
519        // Detect QueuedForAdjudication claims with no matching pending row.
520        let ps_for_orphans = Arc::clone(&pending_store);
521        let orphans = task::spawn_blocking(move || {
522            ps_for_orphans
523                .list_queued_orphan_claims_erased()
524                .map_err(|e| MemError::PendingStore { source: e })
525        })
526        .await
527        .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
528
529        for orphan in orphans {
530            let agent_id = orphan.agent_id.clone();
531
532            let _store_lock = if self.persistence.requires_global_write_serialization() {
533                Some(self.store_write_lock.lock().await)
534            } else {
535                None
536            };
537            let _guard = self.write_locks.acquire(&agent_id).await;
538
539            let persistence = Arc::clone(&self.persistence);
540            let ps = Arc::clone(&pending_store);
541            let orphan_clone = orphan.clone();
542
543            let result = task::spawn_blocking(move || {
544                let uc = SweepAdjudicationsUseCase::new(persistence, ps);
545                uc.revert_orphan(&orphan_clone, now)
546            })
547            .await
548            .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
549
550            if result {
551                swept += 1;
552            }
553        }
554
555        Ok(swept)
556    }
557}
558
559impl<P, O, V> Clone for EngineHandle<P, O, V>
560where
561    P: PersistencePort + Send + Sync + 'static,
562    O: OraclePort + Send + Sync + 'static,
563    V: VectorPort + Send + Sync + 'static,
564{
565    fn clone(&self) -> Self {
566        Self {
567            persistence: Arc::clone(&self.persistence),
568            oracle: self.oracle.clone(),
569            vector: self.vector.clone(),
570            pending_store: self.pending_store.clone(),
571            config: self.config.clone(),
572            write_locks: self.write_locks.clone(),
573            store_write_lock: Arc::clone(&self.store_write_lock),
574        }
575    }
576}