1use std::sync::Arc;
20
21use chrono::Utc;
22use tokio::task;
23
24use crate::{
25 application::{
26 audit::AuditUseCase,
27 dto::{
28 AuditQueryRequest, AuditQueryResponse, IngestClaimRequest, IngestClaimResponse,
29 QueryHistoryRequest, QueryHistoryResponse, QueryMemoryRequest, QueryMemoryResponse,
30 ReconcileRequest, ReconcileResponse,
31 },
32 ingest_claim::IngestClaimUseCase,
33 query_history::QueryHistoryUseCase,
34 query_memory::QueryMemoryUseCase,
35 reconcile::ReconcileUseCase,
36 submit_adjudication::SubmitAdjudicationUseCase,
37 sweep_adjudications::SweepAdjudicationsUseCase,
38 },
39 concurrency::agent_lock::AgentWriteLockMap,
40 config::EngineConfig,
41 error::MemError,
42 ports::{OraclePort, PendingAdjudicationPort, PersistencePort, VectorPort},
43};
44
45#[allow(missing_docs)]
55pub trait ErasedPendingStore: Send + Sync + 'static {
56 fn insert_pending_erased(
57 &self,
58 row: &crate::ports::pending_adjudication::PendingAdjudicationRow,
59 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;
60
61 fn get_pending_erased(
62 &self,
63 handle_id: uuid::Uuid,
64 ) -> Result<Option<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>>;
65
66 fn list_pending_erased(
67 &self,
68 agent_id: Option<&mempill_types::AgentId>,
69 ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>>;
70
71 fn list_expired_erased(
72 &self,
73 now: chrono::DateTime<chrono::Utc>,
74 ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>>;
75
76 fn mark_resolved_erased(
77 &self,
78 handle_id: uuid::Uuid,
79 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;
80
81 fn mark_expired_erased(
82 &self,
83 handle_id: uuid::Uuid,
84 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;
85
86 fn list_queued_orphan_claims_erased(
87 &self,
88 ) -> Result<Vec<crate::ports::pending_adjudication::OrphanedQueuedClaim>, Box<dyn std::error::Error + Send + Sync + 'static>>;
89}
90
91pub struct ErasedPendingStoreAdapter<S: PendingAdjudicationPort> {
93 inner: S,
94}
95
96impl<S: PendingAdjudicationPort> ErasedPendingStoreAdapter<S> {
97 pub fn new(inner: S) -> Self {
99 Self { inner }
100 }
101}
102
103impl<S: PendingAdjudicationPort> ErasedPendingStore for ErasedPendingStoreAdapter<S> {
104 fn insert_pending_erased(
105 &self,
106 row: &crate::ports::pending_adjudication::PendingAdjudicationRow,
107 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
108 self.inner.insert_pending(row).map_err(|e| Box::new(e) as _)
109 }
110
111 fn get_pending_erased(
112 &self,
113 handle_id: uuid::Uuid,
114 ) -> Result<Option<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>> {
115 self.inner.get_pending(handle_id).map_err(|e| Box::new(e) as _)
116 }
117
118 fn list_pending_erased(
119 &self,
120 agent_id: Option<&mempill_types::AgentId>,
121 ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>> {
122 self.inner.list_pending(agent_id).map_err(|e| Box::new(e) as _)
123 }
124
125 fn list_expired_erased(
126 &self,
127 now: chrono::DateTime<chrono::Utc>,
128 ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, Box<dyn std::error::Error + Send + Sync + 'static>> {
129 self.inner.list_expired(now).map_err(|e| Box::new(e) as _)
130 }
131
132 fn mark_resolved_erased(
133 &self,
134 handle_id: uuid::Uuid,
135 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
136 self.inner.mark_resolved(handle_id).map_err(|e| Box::new(e) as _)
137 }
138
139 fn mark_expired_erased(
140 &self,
141 handle_id: uuid::Uuid,
142 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
143 self.inner.mark_expired(handle_id).map_err(|e| Box::new(e) as _)
144 }
145
146 fn list_queued_orphan_claims_erased(
147 &self,
148 ) -> Result<Vec<crate::ports::pending_adjudication::OrphanedQueuedClaim>, Box<dyn std::error::Error + Send + Sync + 'static>> {
149 self.inner.list_queued_orphan_claims().map_err(|e| Box::new(e) as _)
150 }
151}
152
153pub struct EngineHandle<P, O, V>
160where
161 P: PersistencePort + Send + Sync + 'static,
162 O: OraclePort + Send + Sync + 'static,
163 V: VectorPort + Send + Sync + 'static,
164{
165 persistence: Arc<P>,
166 oracle: Option<Arc<O>>,
167 vector: Option<Arc<V>>,
168 pending_store: Option<Arc<dyn ErasedPendingStore>>,
170 config: EngineConfig,
171 write_locks: AgentWriteLockMap,
172 store_write_lock: Arc<tokio::sync::Mutex<()>>,
176}
177
178impl<P, O, V> EngineHandle<P, O, V>
179where
180 P: PersistencePort + Send + Sync + 'static,
181 O: OraclePort + Send + Sync + 'static,
182 V: VectorPort + Send + Sync + 'static,
183{
184 pub fn new(
190 persistence: Arc<P>,
191 oracle: Option<Arc<O>>,
192 vector: Option<Arc<V>>,
193 config: EngineConfig,
194 ) -> Self {
195 Self {
196 persistence,
197 oracle,
198 vector,
199 pending_store: None,
200 config,
201 write_locks: AgentWriteLockMap::new(),
202 store_write_lock: Arc::new(tokio::sync::Mutex::new(())),
203 }
204 }
205
206 pub fn new_with_pending_store<S>(
222 persistence: Arc<P>,
223 oracle: Option<Arc<O>>,
224 vector: Option<Arc<V>>,
225 pending_store: Arc<dyn ErasedPendingStore>,
226 config: EngineConfig,
227 ) -> Self {
228 Self {
229 persistence,
230 oracle,
231 vector,
232 pending_store: Some(pending_store),
233 config,
234 write_locks: AgentWriteLockMap::new(),
235 store_write_lock: Arc::new(tokio::sync::Mutex::new(())),
236 }
237 }
238
239 pub async fn ingest_claim(
247 &self,
248 req: IngestClaimRequest,
249 ) -> Result<IngestClaimResponse, MemError> {
250 let now = Utc::now(); let _store_lock = if self.persistence.requires_global_write_serialization() {
253 Some(self.store_write_lock.lock().await)
254 } else {
255 None
256 };
257 let _guard = self.write_locks.acquire(&req.agent_id).await;
258 let uc = IngestClaimUseCase::new(
259 Arc::clone(&self.persistence),
260 self.oracle.clone(),
261 self.pending_store.clone(),
262 self.config.clone(),
263 );
264 task::spawn_blocking(move || uc.execute_with_time(req, now))
265 .await
266 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
267 }
268
269 pub async fn query_memory(
273 &self,
274 req: QueryMemoryRequest,
275 ) -> Result<QueryMemoryResponse, MemError> {
276 let now = Utc::now();
277 let uc = QueryMemoryUseCase::new(
278 Arc::clone(&self.persistence),
279 self.vector.clone(),
280 self.config.clone(),
281 );
282 task::spawn_blocking(move || uc.execute_with_time(req, now))
283 .await
284 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
285 }
286
287 pub async fn query_history(
295 &self,
296 req: QueryHistoryRequest,
297 ) -> Result<QueryHistoryResponse, MemError> {
298 let now = Utc::now();
299 let uc = QueryHistoryUseCase::new(
300 Arc::clone(&self.persistence),
301 self.vector.clone(),
302 self.config.clone(),
303 );
304 task::spawn_blocking(move || uc.execute_with_time(req, now))
305 .await
306 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
307 }
308
309 pub async fn reconcile(
313 &self,
314 req: ReconcileRequest,
315 ) -> Result<ReconcileResponse, MemError> {
316 let _store_lock = if self.persistence.requires_global_write_serialization() {
318 Some(self.store_write_lock.lock().await)
319 } else {
320 None
321 };
322 let _guard = self.write_locks.acquire(&req.agent_id).await;
323 let uc = ReconcileUseCase::new(
324 Arc::clone(&self.persistence),
325 self.oracle.clone(),
326 self.config.clone(),
327 );
328 task::spawn_blocking(move || uc.execute(req))
329 .await
330 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
331 }
332
333 pub async fn query_audit(
335 &self,
336 req: AuditQueryRequest,
337 ) -> Result<AuditQueryResponse, MemError> {
338 let uc = AuditUseCase::new(Arc::clone(&self.persistence));
339 task::spawn_blocking(move || uc.execute(req))
340 .await
341 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
342 }
343
344 pub async fn submit_adjudication(
373 &self,
374 handle_id: uuid::Uuid,
375 response: mempill_types::AdjudicationResponse,
376 ) -> Result<mempill_types::AdjudicationOutcome, MemError> {
377 let now = Utc::now(); let pending_store = self.pending_store.as_ref()
387 .ok_or(MemError::AdjudicationHandleNotFound { handle_id })?;
388 let pending_store_arc = Arc::clone(pending_store);
389
390 let resolve_result = task::spawn_blocking(move || {
391 let row = pending_store_arc
392 .get_pending_erased(handle_id)
393 .map_err(|e| MemError::PendingStore { source: e })?
394 .ok_or(MemError::AdjudicationHandleNotFound { handle_id })?;
395 Ok::<_, MemError>(row)
396 })
397 .await
398 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
399
400 let row = resolve_result;
401
402 let agent_id = row.agent_id.clone();
406
407 let _store_lock = if self.persistence.requires_global_write_serialization() {
409 Some(self.store_write_lock.lock().await)
410 } else {
411 None
412 };
413 let _guard = self.write_locks.acquire(&agent_id).await;
414
415 let pending_store_arc2 = Arc::clone(pending_store);
417 let uc = SubmitAdjudicationUseCase::new(
418 Arc::clone(&self.persistence),
419 pending_store_arc2,
420 );
421 task::spawn_blocking(move || uc.execute(handle_id, response, now))
422 .await
423 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
424 }
425
426 pub async fn list_pending_adjudications(
434 &self,
435 agent_id: Option<mempill_types::AgentId>,
436 ) -> Result<Vec<crate::ports::pending_adjudication::PendingAdjudicationRow>, MemError> {
437 let pending_store = match &self.pending_store {
438 Some(ps) => Arc::clone(ps),
439 None => return Ok(vec![]),
440 };
441
442 task::spawn_blocking(move || {
443 pending_store
444 .list_pending_erased(agent_id.as_ref())
445 .map_err(|e| MemError::PendingStore { source: e })
446 })
447 .await
448 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })?
449 }
450
451 pub async fn sweep_expired_adjudications(&self) -> Result<usize, MemError> {
473 let now = Utc::now();
474
475 let pending_store = match &self.pending_store {
476 Some(ps) => Arc::clone(ps),
477 None => return Ok(0),
478 };
479
480 let ps_for_list = Arc::clone(&pending_store);
482 let expired_rows = task::spawn_blocking(move || {
483 ps_for_list
484 .list_expired_erased(now)
485 .map_err(|e| MemError::PendingStore { source: e })
486 })
487 .await
488 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
489
490 let mut swept = 0usize;
491
492 for row in expired_rows {
493 let agent_id = row.agent_id.clone();
494
495 let _store_lock = if self.persistence.requires_global_write_serialization() {
496 Some(self.store_write_lock.lock().await)
497 } else {
498 None
499 };
500 let _guard = self.write_locks.acquire(&agent_id).await;
501
502 let persistence = Arc::clone(&self.persistence);
503 let ps = Arc::clone(&pending_store);
504 let row_clone = row.clone();
505
506 let result = task::spawn_blocking(move || {
507 let uc = SweepAdjudicationsUseCase::new(persistence, ps);
508 uc.revert_expired_row(&row_clone, now)
509 })
510 .await
511 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
512
513 if result {
514 swept += 1;
515 }
516 }
517
518 let ps_for_orphans = Arc::clone(&pending_store);
521 let orphans = task::spawn_blocking(move || {
522 ps_for_orphans
523 .list_queued_orphan_claims_erased()
524 .map_err(|e| MemError::PendingStore { source: e })
525 })
526 .await
527 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
528
529 for orphan in orphans {
530 let agent_id = orphan.agent_id.clone();
531
532 let _store_lock = if self.persistence.requires_global_write_serialization() {
533 Some(self.store_write_lock.lock().await)
534 } else {
535 None
536 };
537 let _guard = self.write_locks.acquire(&agent_id).await;
538
539 let persistence = Arc::clone(&self.persistence);
540 let ps = Arc::clone(&pending_store);
541 let orphan_clone = orphan.clone();
542
543 let result = task::spawn_blocking(move || {
544 let uc = SweepAdjudicationsUseCase::new(persistence, ps);
545 uc.revert_orphan(&orphan_clone, now)
546 })
547 .await
548 .map_err(|e| MemError::SpawnBlocking { reason: e.to_string() })??;
549
550 if result {
551 swept += 1;
552 }
553 }
554
555 Ok(swept)
556 }
557}
558
559impl<P, O, V> Clone for EngineHandle<P, O, V>
560where
561 P: PersistencePort + Send + Sync + 'static,
562 O: OraclePort + Send + Sync + 'static,
563 V: VectorPort + Send + Sync + 'static,
564{
565 fn clone(&self) -> Self {
566 Self {
567 persistence: Arc::clone(&self.persistence),
568 oracle: self.oracle.clone(),
569 vector: self.vector.clone(),
570 pending_store: self.pending_store.clone(),
571 config: self.config.clone(),
572 write_locks: self.write_locks.clone(),
573 store_write_lock: Arc::clone(&self.store_write_lock),
574 }
575 }
576}