Skip to main content

mnemara_server/
lib.rs

1#![forbid(unsafe_code)]
2
3mod admission;
4mod observability;
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10use admission::{
11    AdmissionClass, AdmissionConfig, AdmissionController, AdmissionError, RuntimeAdmissionStatus,
12};
13use axum::extract::{Path, Query, Request as AxumRequest};
14use axum::http::HeaderMap;
15use axum::http::StatusCode;
16use axum::middleware::{self, Next};
17use axum::response::IntoResponse;
18use axum::routing::{get, post};
19use axum::{Json, Router};
20use mnemara_core::{
21    AffectiveAnnotation, AffectiveAnnotationProvenance, ArchiveReceipt, ArchiveRequest,
22    ArtifactPointer, BatchUpsertRequest, CompactionReport, CompactionRequest, DeleteReceipt,
23    DeleteRequest, EPISODE_SCHEMA_VERSION, EmbeddingProviderKind, EngineTuningInfo, EpisodeContext,
24    EpisodeContinuityState, EpisodeSalience, ExportRequest, ImportMode, ImportReport,
25    ImportRequest, IntegrityCheckReport, IntegrityCheckRequest, LineageLink, LineageRelationKind,
26    MaintenanceStats, MemoryHistoricalState, MemoryQualityState, MemoryRecord, MemoryRecordKind,
27    MemoryScope, MemoryStore, MemoryTrustLevel, NamespaceStats, OperationTrace,
28    OperationTraceSummary, PortableRecord, PortableStorePackage, RecallCandidateSource,
29    RecallExplanation, RecallFilters, RecallHistoricalMode, RecallPlannerStage,
30    RecallPlanningProfile, RecallPlanningTrace, RecallPolicyProfile, RecallQuery, RecallResult,
31    RecallScorerKind, RecallScoringProfile, RecallTemporalOrder, RecallTraceCandidate,
32    RecoverReceipt, RecoverRequest, RepairReport, RepairRequest, SnapshotManifest,
33    StoreStatsReport, StoreStatsRequest, SuppressReceipt, SuppressRequest, TraceListRequest,
34    TraceOperationKind, TraceStatus, UpsertReceipt, UpsertRequest,
35};
36use mnemara_protocol::v1::memory_service_server::{MemoryService, MemoryServiceServer};
37use mnemara_protocol::v1::{
38    AffectiveAnnotation as ProtoAffectiveAnnotation, ArchiveReply,
39    ArchiveRequest as ProtoArchiveRequest, ArtifactPointer as ProtoArtifactPointer,
40    BatchUpsertMemoryRecordsReply, BatchUpsertMemoryRecordsRequest, CompactReply,
41    CompactRequest as ProtoCompactRequest, DeleteReply, DeleteRequest as ProtoDeleteRequest,
42    EmbeddingProviderKind as ProtoEmbeddingProviderKind, EngineTuningInfo as ProtoEngineTuningInfo,
43    EpisodeContext as ProtoEpisodeContext, EpisodeSalience as ProtoEpisodeSalience,
44    ExportReply as ProtoExportReply, ExportRequest as ProtoExportRequest,
45    GetTraceRequest as ProtoGetTraceRequest, ImportMode as ProtoImportMode,
46    ImportReply as ProtoImportReply, ImportRequest as ProtoImportRequest,
47    IntegrityCheckReply as ProtoIntegrityCheckReply,
48    IntegrityCheckRequest as ProtoIntegrityCheckRequest, LineageLink as ProtoLineageLink,
49    ListTracesReply as ProtoListTracesReply, ListTracesRequest as ProtoListTracesRequest,
50    MaintenanceStats as ProtoMaintenanceStats, MemoryRecord as ProtoMemoryRecord,
51    MemoryScope as ProtoMemoryScope, NamespaceStats as ProtoNamespaceStats,
52    OperationTrace as ProtoOperationTrace, OperationTraceSummary as ProtoOperationTraceSummary,
53    PortableRecord as ProtoPortableRecord, RecallExplanation as ProtoRecallExplanation,
54    RecallFilters as ProtoRecallFilters, RecallHit as ProtoRecallHit,
55    RecallPlanningTrace as ProtoRecallPlanningTrace,
56    RecallPolicyProfile as ProtoRecallPolicyProfile, RecallReply,
57    RecallRequest as ProtoRecallRequest, RecallScoreBreakdown as ProtoRecallScoreBreakdown,
58    RecallScorerKind as ProtoRecallScorerKind, RecallScoringProfile as ProtoRecallScoringProfile,
59    RecallTraceCandidate as ProtoRecallTraceCandidate, RecoverReply,
60    RecoverRequest as ProtoRecoverRequest, RepairReply as ProtoRepairReply,
61    RepairRequest as ProtoRepairRequest, SnapshotReply, SnapshotRequest,
62    StoreStatsReply as ProtoStoreStatsReply, StoreStatsRequest as ProtoStoreStatsRequest,
63    SuppressReply, SuppressRequest as ProtoSuppressRequest,
64    TraceOperationKind as ProtoTraceOperationKind, TraceStatus as ProtoTraceStatus,
65    UpsertMemoryRecordReply, UpsertMemoryRecordRequest as ProtoUpsertMemoryRecordRequest,
66};
67use observability::{TraceRegistry, TraceRegistrySnapshot, now_unix_ms};
68use tonic::{Request, Response, Status};
69
70#[derive(Debug, Clone, serde::Serialize)]
71struct HealthStatus {
72    status: &'static str,
73}
74
75#[derive(Debug, Clone, serde::Serialize)]
76struct ReadyStatus {
77    status: &'static str,
78    record_count: u64,
79    namespaces: Vec<String>,
80}
81
82#[derive(Debug, Clone, serde::Serialize)]
83struct ServerRuntimeStatus {
84    backend: String,
85    admission: RuntimeAdmissionStatus,
86    traces: TraceRegistrySnapshot,
87}
88
89#[derive(Debug, Clone, serde::Serialize)]
90struct HttpErrorBody {
91    error: String,
92}
93
94#[derive(Debug, Default)]
95struct MethodMetrics {
96    started: AtomicU64,
97    ok: AtomicU64,
98    invalid_argument: AtomicU64,
99    conflict: AtomicU64,
100    unimplemented: AtomicU64,
101    internal: AtomicU64,
102}
103
104impl MethodMetrics {
105    fn record_started(&self) {
106        self.started.fetch_add(1, Ordering::Relaxed);
107    }
108
109    fn record_status(&self, status: &Status) {
110        match status.code() {
111            tonic::Code::Ok => {
112                self.ok.fetch_add(1, Ordering::Relaxed);
113            }
114            tonic::Code::InvalidArgument => {
115                self.invalid_argument.fetch_add(1, Ordering::Relaxed);
116            }
117            tonic::Code::AlreadyExists => {
118                self.conflict.fetch_add(1, Ordering::Relaxed);
119            }
120            tonic::Code::Unimplemented => {
121                self.unimplemented.fetch_add(1, Ordering::Relaxed);
122            }
123            _ => {
124                self.internal.fetch_add(1, Ordering::Relaxed);
125            }
126        }
127    }
128}
129
130#[derive(Debug, Default)]
131pub struct ServerMetrics {
132    grpc_upsert: MethodMetrics,
133    grpc_batch_upsert: MethodMetrics,
134    grpc_recall: MethodMetrics,
135    grpc_compact: MethodMetrics,
136    grpc_snapshot: MethodMetrics,
137    grpc_delete: MethodMetrics,
138    grpc_archive: MethodMetrics,
139    grpc_suppress: MethodMetrics,
140    grpc_recover: MethodMetrics,
141    grpc_stats: MethodMetrics,
142    grpc_integrity: MethodMetrics,
143    grpc_repair: MethodMetrics,
144    http_healthz: AtomicU64,
145    http_readyz: AtomicU64,
146    http_snapshot: AtomicU64,
147    http_compact: AtomicU64,
148    http_delete: AtomicU64,
149    http_archive: AtomicU64,
150    http_suppress: AtomicU64,
151    http_recover: AtomicU64,
152    http_metrics: AtomicU64,
153    http_traces: AtomicU64,
154    http_runtime: AtomicU64,
155    http_export: AtomicU64,
156    http_import: AtomicU64,
157    http_rejected_body_too_large: AtomicU64,
158    admission_rejected: AtomicU64,
159    admission_timed_out: AtomicU64,
160    trace_records: AtomicU64,
161    trace_evictions: AtomicU64,
162    trace_reads: AtomicU64,
163}
164
165impl ServerMetrics {
166    pub fn render(&self) -> String {
167        let mut output = String::new();
168        append_counter(
169            &mut output,
170            "mnemara_grpc_upsert_requests_started_total",
171            self.grpc_upsert.started.load(Ordering::Relaxed),
172        );
173        append_counter(
174            &mut output,
175            "mnemara_grpc_upsert_requests_ok_total",
176            self.grpc_upsert.ok.load(Ordering::Relaxed),
177        );
178        append_counter(
179            &mut output,
180            "mnemara_grpc_upsert_requests_invalid_argument_total",
181            self.grpc_upsert.invalid_argument.load(Ordering::Relaxed),
182        );
183        append_counter(
184            &mut output,
185            "mnemara_grpc_upsert_requests_conflict_total",
186            self.grpc_upsert.conflict.load(Ordering::Relaxed),
187        );
188        append_counter(
189            &mut output,
190            "mnemara_grpc_upsert_requests_unimplemented_total",
191            self.grpc_upsert.unimplemented.load(Ordering::Relaxed),
192        );
193        append_counter(
194            &mut output,
195            "mnemara_grpc_upsert_requests_internal_total",
196            self.grpc_upsert.internal.load(Ordering::Relaxed),
197        );
198        append_counter(
199            &mut output,
200            "mnemara_grpc_batch_upsert_requests_started_total",
201            self.grpc_batch_upsert.started.load(Ordering::Relaxed),
202        );
203        append_counter(
204            &mut output,
205            "mnemara_grpc_batch_upsert_requests_ok_total",
206            self.grpc_batch_upsert.ok.load(Ordering::Relaxed),
207        );
208        append_counter(
209            &mut output,
210            "mnemara_grpc_batch_upsert_requests_invalid_argument_total",
211            self.grpc_batch_upsert
212                .invalid_argument
213                .load(Ordering::Relaxed),
214        );
215        append_counter(
216            &mut output,
217            "mnemara_grpc_recall_requests_started_total",
218            self.grpc_recall.started.load(Ordering::Relaxed),
219        );
220        append_counter(
221            &mut output,
222            "mnemara_grpc_recall_requests_ok_total",
223            self.grpc_recall.ok.load(Ordering::Relaxed),
224        );
225        append_counter(
226            &mut output,
227            "mnemara_grpc_recall_requests_invalid_argument_total",
228            self.grpc_recall.invalid_argument.load(Ordering::Relaxed),
229        );
230        append_counter(
231            &mut output,
232            "mnemara_grpc_compact_requests_started_total",
233            self.grpc_compact.started.load(Ordering::Relaxed),
234        );
235        append_counter(
236            &mut output,
237            "mnemara_grpc_compact_requests_ok_total",
238            self.grpc_compact.ok.load(Ordering::Relaxed),
239        );
240        append_counter(
241            &mut output,
242            "mnemara_grpc_snapshot_requests_started_total",
243            self.grpc_snapshot.started.load(Ordering::Relaxed),
244        );
245        append_counter(
246            &mut output,
247            "mnemara_grpc_snapshot_requests_ok_total",
248            self.grpc_snapshot.ok.load(Ordering::Relaxed),
249        );
250        append_counter(
251            &mut output,
252            "mnemara_grpc_delete_requests_started_total",
253            self.grpc_delete.started.load(Ordering::Relaxed),
254        );
255        append_counter(
256            &mut output,
257            "mnemara_grpc_delete_requests_ok_total",
258            self.grpc_delete.ok.load(Ordering::Relaxed),
259        );
260        append_counter(
261            &mut output,
262            "mnemara_grpc_archive_requests_started_total",
263            self.grpc_archive.started.load(Ordering::Relaxed),
264        );
265        append_counter(
266            &mut output,
267            "mnemara_grpc_archive_requests_ok_total",
268            self.grpc_archive.ok.load(Ordering::Relaxed),
269        );
270        append_counter(
271            &mut output,
272            "mnemara_grpc_suppress_requests_started_total",
273            self.grpc_suppress.started.load(Ordering::Relaxed),
274        );
275        append_counter(
276            &mut output,
277            "mnemara_grpc_suppress_requests_ok_total",
278            self.grpc_suppress.ok.load(Ordering::Relaxed),
279        );
280        append_counter(
281            &mut output,
282            "mnemara_grpc_recover_requests_started_total",
283            self.grpc_recover.started.load(Ordering::Relaxed),
284        );
285        append_counter(
286            &mut output,
287            "mnemara_grpc_recover_requests_ok_total",
288            self.grpc_recover.ok.load(Ordering::Relaxed),
289        );
290        append_counter(
291            &mut output,
292            "mnemara_http_healthz_requests_total",
293            self.http_healthz.load(Ordering::Relaxed),
294        );
295        append_counter(
296            &mut output,
297            "mnemara_http_readyz_requests_total",
298            self.http_readyz.load(Ordering::Relaxed),
299        );
300        append_counter(
301            &mut output,
302            "mnemara_http_snapshot_requests_total",
303            self.http_snapshot.load(Ordering::Relaxed),
304        );
305        append_counter(
306            &mut output,
307            "mnemara_http_compact_requests_total",
308            self.http_compact.load(Ordering::Relaxed),
309        );
310        append_counter(
311            &mut output,
312            "mnemara_http_delete_requests_total",
313            self.http_delete.load(Ordering::Relaxed),
314        );
315        append_counter(
316            &mut output,
317            "mnemara_http_archive_requests_total",
318            self.http_archive.load(Ordering::Relaxed),
319        );
320        append_counter(
321            &mut output,
322            "mnemara_http_suppress_requests_total",
323            self.http_suppress.load(Ordering::Relaxed),
324        );
325        append_counter(
326            &mut output,
327            "mnemara_http_recover_requests_total",
328            self.http_recover.load(Ordering::Relaxed),
329        );
330        append_counter(
331            &mut output,
332            "mnemara_http_metrics_requests_total",
333            self.http_metrics.load(Ordering::Relaxed),
334        );
335        append_counter(
336            &mut output,
337            "mnemara_http_traces_requests_total",
338            self.http_traces.load(Ordering::Relaxed),
339        );
340        append_counter(
341            &mut output,
342            "mnemara_http_runtime_requests_total",
343            self.http_runtime.load(Ordering::Relaxed),
344        );
345        append_counter(
346            &mut output,
347            "mnemara_http_export_requests_total",
348            self.http_export.load(Ordering::Relaxed),
349        );
350        append_counter(
351            &mut output,
352            "mnemara_http_import_requests_total",
353            self.http_import.load(Ordering::Relaxed),
354        );
355        append_counter(
356            &mut output,
357            "mnemara_http_rejected_body_too_large_total",
358            self.http_rejected_body_too_large.load(Ordering::Relaxed),
359        );
360        append_counter(
361            &mut output,
362            "mnemara_admission_rejected_total",
363            self.admission_rejected.load(Ordering::Relaxed),
364        );
365        append_counter(
366            &mut output,
367            "mnemara_admission_timed_out_total",
368            self.admission_timed_out.load(Ordering::Relaxed),
369        );
370        append_counter(
371            &mut output,
372            "mnemara_trace_records_total",
373            self.trace_records.load(Ordering::Relaxed),
374        );
375        append_counter(
376            &mut output,
377            "mnemara_trace_evictions_total",
378            self.trace_evictions.load(Ordering::Relaxed),
379        );
380        append_counter(
381            &mut output,
382            "mnemara_trace_reads_total",
383            self.trace_reads.load(Ordering::Relaxed),
384        );
385        output
386    }
387}
388
389fn append_counter(output: &mut String, name: &str, value: u64) {
390    output.push_str("# TYPE ");
391    output.push_str(name);
392    output.push_str(" counter\n");
393    output.push_str(name);
394    output.push(' ');
395    output.push_str(&value.to_string());
396    output.push('\n');
397}
398
399#[derive(Debug, Clone)]
400pub struct ServerLimits {
401    pub max_http_body_bytes: usize,
402    pub max_batch_upsert_requests: usize,
403    pub max_recall_items: usize,
404    pub max_query_text_bytes: usize,
405    pub max_record_content_bytes: usize,
406    pub max_labels_per_scope: usize,
407    pub max_inflight_reads: usize,
408    pub max_inflight_writes: usize,
409    pub max_inflight_admin: usize,
410    pub max_queued_requests: usize,
411    pub max_tenant_inflight: usize,
412    pub queue_wait_timeout_ms: u64,
413    pub trace_retention: usize,
414}
415
416impl Default for ServerLimits {
417    fn default() -> Self {
418        Self {
419            max_http_body_bytes: 64 * 1024,
420            max_batch_upsert_requests: 128,
421            max_recall_items: 64,
422            max_query_text_bytes: 4 * 1024,
423            max_record_content_bytes: 32 * 1024,
424            max_labels_per_scope: 32,
425            max_inflight_reads: 64,
426            max_inflight_writes: 32,
427            max_inflight_admin: 8,
428            max_queued_requests: 256,
429            max_tenant_inflight: 16,
430            queue_wait_timeout_ms: 2_000,
431            trace_retention: 256,
432        }
433    }
434}
435
436#[derive(Debug, Clone, Default)]
437pub struct AuthConfig {
438    pub bearer_token: Option<String>,
439    pub protect_metrics: bool,
440    pub token_policies: Vec<TokenPolicy>,
441}
442
443#[derive(Debug, Clone, Copy, PartialEq, Eq)]
444pub enum AuthPermission {
445    Read,
446    Write,
447    Admin,
448    Metrics,
449}
450
451#[derive(Debug, Clone, PartialEq, Eq)]
452pub struct TokenPolicy {
453    pub token: String,
454    pub permissions: Vec<AuthPermission>,
455}
456
457impl AuthConfig {
458    fn has_authentication(&self) -> bool {
459        self.bearer_token.is_some() || !self.token_policies.is_empty()
460    }
461
462    fn authorize(&self, authorization_header: Option<&str>, permission: AuthPermission) -> bool {
463        if !self.has_authentication() {
464            return true;
465        }
466
467        let Some(token) = bearer_token_from_header(authorization_header) else {
468            return false;
469        };
470
471        if self.bearer_token.as_deref() == Some(token) {
472            return true;
473        }
474
475        self.token_policies
476            .iter()
477            .any(|policy| policy.token == token && policy.permissions.contains(&permission))
478    }
479}
480
481#[derive(Debug, Clone, serde::Deserialize)]
482pub struct DeleteHttpRequest {
483    pub tenant_id: String,
484    pub namespace: String,
485    pub record_id: String,
486    pub hard_delete: bool,
487    pub audit_reason: String,
488}
489
490#[derive(Debug, Clone, serde::Deserialize)]
491pub struct ArchiveHttpRequest {
492    pub tenant_id: String,
493    pub namespace: String,
494    pub record_id: String,
495    pub dry_run: bool,
496    pub audit_reason: String,
497}
498
499#[derive(Debug, Clone, serde::Deserialize)]
500pub struct SuppressHttpRequest {
501    pub tenant_id: String,
502    pub namespace: String,
503    pub record_id: String,
504    pub dry_run: bool,
505    pub audit_reason: String,
506}
507
508#[derive(Debug, Clone, serde::Deserialize)]
509pub struct RecoverHttpRequest {
510    pub tenant_id: String,
511    pub namespace: String,
512    pub record_id: String,
513    pub dry_run: bool,
514    pub audit_reason: String,
515    pub quality_state: MemoryQualityState,
516    pub historical_state: Option<MemoryHistoricalState>,
517}
518
519pub struct GrpcMemoryService<S> {
520    store: Arc<S>,
521    runtime: ServerRuntime,
522}
523
524impl<S> Clone for GrpcMemoryService<S> {
525    fn clone(&self) -> Self {
526        Self {
527            store: Arc::clone(&self.store),
528            runtime: self.runtime.clone(),
529        }
530    }
531}
532
533#[derive(Debug)]
534struct ServerRuntimeInner {
535    backend: Arc<String>,
536    limits: Arc<ServerLimits>,
537    metrics: Arc<ServerMetrics>,
538    auth: Arc<AuthConfig>,
539    traces: Arc<TraceRegistry>,
540    admission: Arc<AdmissionController>,
541}
542
543#[derive(Debug, Clone)]
544pub struct ServerRuntime {
545    inner: Arc<ServerRuntimeInner>,
546}
547
548impl ServerRuntime {
549    pub fn new(
550        backend: impl Into<String>,
551        limits: ServerLimits,
552        metrics: Arc<ServerMetrics>,
553        auth: AuthConfig,
554    ) -> Self {
555        let admission = AdmissionController::new(AdmissionConfig {
556            max_inflight_reads: limits.max_inflight_reads,
557            max_inflight_writes: limits.max_inflight_writes,
558            max_inflight_admin: limits.max_inflight_admin,
559            max_queued_requests: limits.max_queued_requests,
560            max_tenant_inflight: limits.max_tenant_inflight,
561            queue_wait_timeout_ms: limits.queue_wait_timeout_ms,
562        });
563        Self {
564            inner: Arc::new(ServerRuntimeInner {
565                backend: Arc::new(backend.into()),
566                traces: Arc::new(TraceRegistry::new(limits.trace_retention)),
567                admission: Arc::new(admission),
568                auth: Arc::new(auth),
569                metrics,
570                limits: Arc::new(limits),
571            }),
572        }
573    }
574
575    fn limits(&self) -> &Arc<ServerLimits> {
576        &self.inner.limits
577    }
578
579    fn metrics(&self) -> &Arc<ServerMetrics> {
580        &self.inner.metrics
581    }
582
583    fn backend(&self) -> &Arc<String> {
584        &self.inner.backend
585    }
586
587    fn auth(&self) -> &Arc<AuthConfig> {
588        &self.inner.auth
589    }
590
591    fn traces(&self) -> &Arc<TraceRegistry> {
592        &self.inner.traces
593    }
594
595    fn admission(&self) -> &Arc<AdmissionController> {
596        &self.inner.admission
597    }
598}
599
600impl<S: MemoryStore> GrpcMemoryService<S> {
601    pub fn new(store: Arc<S>) -> Self {
602        let backend = store.as_ref().backend_kind().to_string();
603        Self::with_runtime(
604            store,
605            ServerRuntime::new(
606                backend,
607                ServerLimits::default(),
608                Arc::new(ServerMetrics::default()),
609                AuthConfig::default(),
610            ),
611        )
612    }
613
614    pub fn with_limits(store: Arc<S>, limits: ServerLimits) -> Self {
615        let backend = store.as_ref().backend_kind().to_string();
616        Self::with_runtime(
617            store,
618            ServerRuntime::new(
619                backend,
620                limits,
621                Arc::new(ServerMetrics::default()),
622                AuthConfig::default(),
623            ),
624        )
625    }
626
627    pub fn with_observability(
628        store: Arc<S>,
629        limits: ServerLimits,
630        metrics: Arc<ServerMetrics>,
631    ) -> Self {
632        let backend = store.as_ref().backend_kind().to_string();
633        Self::with_runtime(
634            store,
635            ServerRuntime::new(backend, limits, metrics, AuthConfig::default()),
636        )
637    }
638
639    pub fn with_runtime_config(
640        store: Arc<S>,
641        limits: ServerLimits,
642        metrics: Arc<ServerMetrics>,
643        auth: AuthConfig,
644    ) -> Self {
645        let backend = store.as_ref().backend_kind().to_string();
646        Self::with_runtime(store, ServerRuntime::new(backend, limits, metrics, auth))
647    }
648
649    pub fn with_runtime(store: Arc<S>, runtime: ServerRuntime) -> Self {
650        Self { store, runtime }
651    }
652
653    pub fn into_service(self) -> MemoryServiceServer<Self>
654    where
655        Self: MemoryService,
656    {
657        MemoryServiceServer::new(self)
658    }
659}
660
661pub async fn serve<S>(
662    addr: SocketAddr,
663    store: Arc<S>,
664    limits: ServerLimits,
665    auth: AuthConfig,
666) -> Result<(), tonic::transport::Error>
667where
668    S: MemoryStore + 'static,
669{
670    let backend = store.as_ref().backend_kind().to_string();
671    serve_with_runtime(
672        addr,
673        store,
674        ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
675    )
676    .await
677}
678
679pub async fn serve_with_runtime<S>(
680    addr: SocketAddr,
681    store: Arc<S>,
682    runtime: ServerRuntime,
683) -> Result<(), tonic::transport::Error>
684where
685    S: MemoryStore + 'static,
686{
687    tonic::transport::Server::builder()
688        .add_service(GrpcMemoryService::with_runtime(store, runtime).into_service())
689        .serve(addr)
690        .await
691}
692
693pub fn http_app<S>(store: Arc<S>, limits: ServerLimits, auth: AuthConfig) -> Router
694where
695    S: MemoryStore + 'static,
696{
697    let backend = store.as_ref().backend_kind().to_string();
698    http_app_with_runtime(
699        store,
700        ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
701    )
702}
703
704pub fn http_app_with_metrics<S>(
705    store: Arc<S>,
706    limits: ServerLimits,
707    metrics: Arc<ServerMetrics>,
708    auth: AuthConfig,
709) -> Router
710where
711    S: MemoryStore + 'static,
712{
713    let backend = store.as_ref().backend_kind().to_string();
714    http_app_with_runtime(store, ServerRuntime::new(backend, limits, metrics, auth))
715}
716
717pub fn http_app_with_runtime<S>(store: Arc<S>, runtime: ServerRuntime) -> Router
718where
719    S: MemoryStore + 'static,
720{
721    let limits = Arc::clone(runtime.limits());
722    let ready_store = Arc::clone(&store);
723    let upsert_store = Arc::clone(&store);
724    let batch_upsert_store = Arc::clone(&store);
725    let recall_store = Arc::clone(&store);
726    let snapshot_store = Arc::clone(&store);
727    let stats_store = Arc::clone(&store);
728    let integrity_store = Arc::clone(&store);
729    let repair_store = Arc::clone(&store);
730    let compact_store = Arc::clone(&store);
731    let delete_store = Arc::clone(&store);
732    let archive_store = Arc::clone(&store);
733    let suppress_store = Arc::clone(&store);
734    let recover_store = Arc::clone(&store);
735    let export_store = Arc::clone(&store);
736    let import_store = Arc::clone(&store);
737    let ready_runtime = runtime.clone();
738    let upsert_runtime = runtime.clone();
739    let batch_upsert_runtime = runtime.clone();
740    let recall_runtime = runtime.clone();
741    let snapshot_runtime = runtime.clone();
742    let stats_runtime = runtime.clone();
743    let integrity_runtime = runtime.clone();
744    let repair_runtime = runtime.clone();
745    let compact_runtime = runtime.clone();
746    let delete_runtime = runtime.clone();
747    let archive_runtime = runtime.clone();
748    let suppress_runtime = runtime.clone();
749    let recover_runtime = runtime.clone();
750    let traces_runtime = runtime.clone();
751    let trace_runtime = runtime.clone();
752    let export_runtime = runtime.clone();
753    let import_runtime = runtime.clone();
754    let runtime_status_runtime = runtime.clone();
755    let metrics_metrics = Arc::clone(runtime.metrics());
756    let middleware_limits = Arc::clone(&limits);
757    let middleware_metrics = Arc::clone(runtime.metrics());
758    let middleware_auth = Arc::clone(runtime.auth());
759
760    let health_metrics = Arc::clone(runtime.metrics());
761
762    Router::new()
763        .route(
764            "/healthz",
765            get(move || {
766                let metrics = Arc::clone(&health_metrics);
767                async move { healthz(metrics).await }
768            }),
769        )
770        .route(
771            "/readyz",
772            get(move || {
773                let store = Arc::clone(&ready_store);
774                let runtime = ready_runtime.clone();
775                async move { readyz(store, runtime).await }
776            }),
777        )
778        .route(
779            "/memory/upsert",
780            post(
781                move |headers: HeaderMap, Json(request): Json<UpsertRequest>| {
782                    let store = Arc::clone(&upsert_store);
783                    let runtime = upsert_runtime.clone();
784                    async move { upsert_http(store, request, headers, runtime).await }
785                },
786            ),
787        )
788        .route(
789            "/memory/batch-upsert",
790            post(
791                move |headers: HeaderMap, Json(request): Json<BatchUpsertRequest>| {
792                    let store = Arc::clone(&batch_upsert_store);
793                    let runtime = batch_upsert_runtime.clone();
794                    async move { batch_upsert_http(store, request, headers, runtime).await }
795                },
796            ),
797        )
798        .route(
799            "/memory/recall",
800            post(
801                move |headers: HeaderMap, Json(request): Json<RecallQuery>| {
802                    let store = Arc::clone(&recall_store);
803                    let runtime = recall_runtime.clone();
804                    async move { recall_http(store, request, headers, runtime).await }
805                },
806            ),
807        )
808        .route(
809            "/admin/snapshot",
810            get(move |headers: HeaderMap| {
811                let store = Arc::clone(&snapshot_store);
812                let runtime = snapshot_runtime.clone();
813                async move { snapshot_http(store, headers, runtime).await }
814            }),
815        )
816        .route(
817            "/admin/stats",
818            get(
819                move |headers: HeaderMap, Query(request): Query<StoreStatsRequest>| {
820                    let store = Arc::clone(&stats_store);
821                    let runtime = stats_runtime.clone();
822                    async move { stats_http(store, request, headers, runtime).await }
823                },
824            ),
825        )
826        .route(
827            "/admin/integrity",
828            get(
829                move |headers: HeaderMap, Query(request): Query<IntegrityCheckRequest>| {
830                    let store = Arc::clone(&integrity_store);
831                    let runtime = integrity_runtime.clone();
832                    async move { integrity_http(store, request, headers, runtime).await }
833                },
834            ),
835        )
836        .route(
837            "/admin/repair",
838            post(
839                move |headers: HeaderMap, Json(request): Json<RepairRequest>| {
840                    let store = Arc::clone(&repair_store);
841                    let runtime = repair_runtime.clone();
842                    async move { repair_http(store, request, headers, runtime).await }
843                },
844            ),
845        )
846        .route(
847            "/admin/compact",
848            post(
849                move |headers: HeaderMap, Json(request): Json<CompactionRequest>| {
850                    let store = Arc::clone(&compact_store);
851                    let runtime = compact_runtime.clone();
852                    async move { compact_http(store, request, headers, runtime).await }
853                },
854            ),
855        )
856        .route(
857            "/admin/delete",
858            post(
859                move |headers: HeaderMap, Json(request): Json<DeleteHttpRequest>| {
860                    let store = Arc::clone(&delete_store);
861                    let runtime = delete_runtime.clone();
862                    async move { delete_http(store, request, headers, runtime).await }
863                },
864            ),
865        )
866        .route(
867            "/admin/archive",
868            post(
869                move |headers: HeaderMap, Json(request): Json<ArchiveHttpRequest>| {
870                    let store = Arc::clone(&archive_store);
871                    let runtime = archive_runtime.clone();
872                    async move { archive_http(store, request, headers, runtime).await }
873                },
874            ),
875        )
876        .route(
877            "/admin/suppress",
878            post(
879                move |headers: HeaderMap, Json(request): Json<SuppressHttpRequest>| {
880                    let store = Arc::clone(&suppress_store);
881                    let runtime = suppress_runtime.clone();
882                    async move { suppress_http(store, request, headers, runtime).await }
883                },
884            ),
885        )
886        .route(
887            "/admin/recover",
888            post(
889                move |headers: HeaderMap, Json(request): Json<RecoverHttpRequest>| {
890                    let store = Arc::clone(&recover_store);
891                    let runtime = recover_runtime.clone();
892                    async move { recover_http(store, request, headers, runtime).await }
893                },
894            ),
895        )
896        .route(
897            "/admin/traces",
898            get(move |Query(request): Query<TraceListRequest>| {
899                let runtime = traces_runtime.clone();
900                async move { traces_http(request, runtime).await }
901            }),
902        )
903        .route(
904            "/admin/traces/{trace_id}",
905            get(move |Path(trace_id): Path<String>| {
906                let runtime = trace_runtime.clone();
907                async move { trace_http(trace_id, runtime).await }
908            }),
909        )
910        .route(
911            "/admin/runtime",
912            get(move || {
913                let runtime = runtime_status_runtime.clone();
914                async move { runtime_status_http(runtime).await }
915            }),
916        )
917        .route(
918            "/admin/export",
919            post(
920                move |headers: HeaderMap, Json(request): Json<ExportRequest>| {
921                    let store = Arc::clone(&export_store);
922                    let runtime = export_runtime.clone();
923                    async move { export_http(store, request, headers, runtime).await }
924                },
925            ),
926        )
927        .route(
928            "/admin/import",
929            post(
930                move |headers: HeaderMap, Json(request): Json<ImportRequest>| {
931                    let store = Arc::clone(&import_store);
932                    let runtime = import_runtime.clone();
933                    async move { import_http(store, request, headers, runtime).await }
934                },
935            ),
936        )
937        .route(
938            "/metrics",
939            get(move || {
940                let metrics = Arc::clone(&metrics_metrics);
941                async move { metrics_http(metrics).await }
942            }),
943        )
944        .layer(middleware::from_fn(move |request, next| {
945            let limits = Arc::clone(&middleware_limits);
946            let metrics = Arc::clone(&middleware_metrics);
947            let auth = Arc::clone(&middleware_auth);
948            async move { enforce_http_guardrails(request, next, limits, metrics, auth).await }
949        }))
950}
951
952pub async fn serve_http<S>(
953    addr: SocketAddr,
954    store: Arc<S>,
955    limits: ServerLimits,
956    auth: AuthConfig,
957) -> std::io::Result<()>
958where
959    S: MemoryStore + 'static,
960{
961    let backend = store.as_ref().backend_kind().to_string();
962    serve_http_with_runtime(
963        addr,
964        store,
965        ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
966    )
967    .await
968}
969
970pub async fn serve_http_with_runtime<S>(
971    addr: SocketAddr,
972    store: Arc<S>,
973    runtime: ServerRuntime,
974) -> std::io::Result<()>
975where
976    S: MemoryStore + 'static,
977{
978    let listener = tokio::net::TcpListener::bind(addr).await?;
979    axum::serve(listener, http_app_with_runtime(store, runtime)).await
980}
981
982async fn healthz(metrics: Arc<ServerMetrics>) -> Json<HealthStatus> {
983    metrics.http_healthz.fetch_add(1, Ordering::Relaxed);
984    Json(HealthStatus { status: "ok" })
985}
986
987async fn readyz<S>(
988    store: Arc<S>,
989    runtime: ServerRuntime,
990) -> Result<Json<ReadyStatus>, (StatusCode, Json<HttpErrorBody>)>
991where
992    S: MemoryStore + 'static,
993{
994    runtime
995        .metrics()
996        .http_readyz
997        .fetch_add(1, Ordering::Relaxed);
998    let manifest = store.snapshot().await.map_err(map_http_store_error)?;
999    Ok(Json(ReadyStatus {
1000        status: "ready",
1001        record_count: manifest.record_count,
1002        namespaces: manifest.namespaces,
1003    }))
1004}
1005
1006async fn upsert_http<S>(
1007    store: Arc<S>,
1008    request: UpsertRequest,
1009    headers: HeaderMap,
1010    runtime: ServerRuntime,
1011) -> Result<Json<UpsertReceipt>, (StatusCode, Json<HttpErrorBody>)>
1012where
1013    S: MemoryStore + 'static,
1014{
1015    let started_at_unix_ms = now_unix_ms();
1016    let correlation_id = runtime.traces().next_id("corr");
1017    let _permit = runtime
1018        .admission()
1019        .acquire(AdmissionClass::Write, Some(&request.record.scope.tenant_id))
1020        .await
1021        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1022    let receipt = store
1023        .upsert(request.clone())
1024        .await
1025        .map_err(map_http_store_error)?;
1026    record_trace(
1027        &runtime,
1028        TraceOperationKind::Upsert,
1029        "http",
1030        Some(request.record.scope.tenant_id),
1031        Some(request.record.scope.namespace),
1032        http_principal(&headers),
1033        started_at_unix_ms,
1034        TraceStatus::Ok,
1035        None,
1036        OperationTraceSummary {
1037            record_id: Some(request.record.id),
1038            ..OperationTraceSummary::default()
1039        },
1040        None,
1041        correlation_id,
1042    );
1043    Ok(Json(receipt))
1044}
1045
1046async fn batch_upsert_http<S>(
1047    store: Arc<S>,
1048    request: BatchUpsertRequest,
1049    headers: HeaderMap,
1050    runtime: ServerRuntime,
1051) -> Result<Json<Vec<UpsertReceipt>>, (StatusCode, Json<HttpErrorBody>)>
1052where
1053    S: MemoryStore + 'static,
1054{
1055    let started_at_unix_ms = now_unix_ms();
1056    let correlation_id = runtime.traces().next_id("corr");
1057    let tenant_id = request
1058        .requests
1059        .first()
1060        .map(|item| item.record.scope.tenant_id.clone());
1061    let namespace = request
1062        .requests
1063        .first()
1064        .map(|item| item.record.scope.namespace.clone());
1065    let _permit = runtime
1066        .admission()
1067        .acquire(AdmissionClass::Write, tenant_id.as_deref())
1068        .await
1069        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1070    let receipts = store
1071        .batch_upsert(request.clone())
1072        .await
1073        .map_err(map_http_store_error)?;
1074    record_trace(
1075        &runtime,
1076        TraceOperationKind::BatchUpsert,
1077        "http",
1078        tenant_id,
1079        namespace,
1080        http_principal(&headers),
1081        started_at_unix_ms,
1082        TraceStatus::Ok,
1083        None,
1084        OperationTraceSummary {
1085            request_count: Some(request.requests.len() as u32),
1086            ..OperationTraceSummary::default()
1087        },
1088        None,
1089        correlation_id,
1090    );
1091    Ok(Json(receipts))
1092}
1093
1094async fn recall_http<S>(
1095    store: Arc<S>,
1096    request: RecallQuery,
1097    headers: HeaderMap,
1098    runtime: ServerRuntime,
1099) -> Result<Json<RecallResult>, (StatusCode, Json<HttpErrorBody>)>
1100where
1101    S: MemoryStore + 'static,
1102{
1103    let started_at_unix_ms = now_unix_ms();
1104    let correlation_id = runtime.traces().next_id("corr");
1105    let _permit = runtime
1106        .admission()
1107        .acquire(AdmissionClass::Read, Some(&request.scope.tenant_id))
1108        .await
1109        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1110    let mut result = store
1111        .recall(request.clone())
1112        .await
1113        .map_err(map_http_store_error)?;
1114    attach_correlation_id(&mut result, &correlation_id);
1115    record_trace(
1116        &runtime,
1117        TraceOperationKind::Recall,
1118        "http",
1119        Some(request.scope.tenant_id),
1120        Some(request.scope.namespace),
1121        http_principal(&headers),
1122        started_at_unix_ms,
1123        TraceStatus::Ok,
1124        None,
1125        OperationTraceSummary {
1126            query_text: Some(request.query_text),
1127            max_items: Some(request.max_items as u32),
1128            token_budget: request.token_budget.map(|value| value as u32),
1129            ..OperationTraceSummary::default()
1130        },
1131        result.explanation.clone(),
1132        correlation_id,
1133    );
1134    Ok(Json(result))
1135}
1136
1137async fn snapshot_http<S>(
1138    store: Arc<S>,
1139    headers: HeaderMap,
1140    runtime: ServerRuntime,
1141) -> Result<Json<SnapshotManifest>, (StatusCode, Json<HttpErrorBody>)>
1142where
1143    S: MemoryStore + 'static,
1144{
1145    runtime
1146        .metrics()
1147        .http_snapshot
1148        .fetch_add(1, Ordering::Relaxed);
1149    let started_at_unix_ms = now_unix_ms();
1150    let correlation_id = runtime.traces().next_id("corr");
1151    let _permit = runtime
1152        .admission()
1153        .acquire(AdmissionClass::Admin, None)
1154        .await
1155        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1156    let manifest = store.snapshot().await.map_err(map_http_store_error)?;
1157    record_trace(
1158        &runtime,
1159        TraceOperationKind::Snapshot,
1160        "http",
1161        None,
1162        None,
1163        http_principal(&headers),
1164        started_at_unix_ms,
1165        TraceStatus::Ok,
1166        None,
1167        OperationTraceSummary::default(),
1168        None,
1169        correlation_id,
1170    );
1171    Ok(Json(manifest))
1172}
1173
1174async fn stats_http<S>(
1175    store: Arc<S>,
1176    request: StoreStatsRequest,
1177    headers: HeaderMap,
1178    runtime: ServerRuntime,
1179) -> Result<Json<StoreStatsReport>, (StatusCode, Json<HttpErrorBody>)>
1180where
1181    S: MemoryStore + 'static,
1182{
1183    let started_at_unix_ms = now_unix_ms();
1184    let correlation_id = runtime.traces().next_id("corr");
1185    let _permit = runtime
1186        .admission()
1187        .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1188        .await
1189        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1190    let report = store
1191        .stats(request.clone())
1192        .await
1193        .map_err(map_http_store_error)?;
1194    record_trace(
1195        &runtime,
1196        TraceOperationKind::Stats,
1197        "http",
1198        request.tenant_id,
1199        request.namespace,
1200        http_principal(&headers),
1201        started_at_unix_ms,
1202        TraceStatus::Ok,
1203        None,
1204        OperationTraceSummary::default(),
1205        None,
1206        correlation_id,
1207    );
1208    Ok(Json(report))
1209}
1210
1211async fn integrity_http<S>(
1212    store: Arc<S>,
1213    request: IntegrityCheckRequest,
1214    headers: HeaderMap,
1215    runtime: ServerRuntime,
1216) -> Result<Json<IntegrityCheckReport>, (StatusCode, Json<HttpErrorBody>)>
1217where
1218    S: MemoryStore + 'static,
1219{
1220    let started_at_unix_ms = now_unix_ms();
1221    let correlation_id = runtime.traces().next_id("corr");
1222    let _permit = runtime
1223        .admission()
1224        .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1225        .await
1226        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1227    let report = store
1228        .integrity_check(request.clone())
1229        .await
1230        .map_err(map_http_store_error)?;
1231    record_trace(
1232        &runtime,
1233        TraceOperationKind::IntegrityCheck,
1234        "http",
1235        request.tenant_id,
1236        request.namespace,
1237        http_principal(&headers),
1238        started_at_unix_ms,
1239        TraceStatus::Ok,
1240        None,
1241        OperationTraceSummary::default(),
1242        None,
1243        correlation_id,
1244    );
1245    Ok(Json(report))
1246}
1247
1248async fn repair_http<S>(
1249    store: Arc<S>,
1250    request: RepairRequest,
1251    headers: HeaderMap,
1252    runtime: ServerRuntime,
1253) -> Result<Json<RepairReport>, (StatusCode, Json<HttpErrorBody>)>
1254where
1255    S: MemoryStore + 'static,
1256{
1257    let started_at_unix_ms = now_unix_ms();
1258    let correlation_id = runtime.traces().next_id("corr");
1259    let _permit = runtime
1260        .admission()
1261        .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1262        .await
1263        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1264    let report = store
1265        .repair(request.clone())
1266        .await
1267        .map_err(map_http_store_error)?;
1268    record_trace(
1269        &runtime,
1270        TraceOperationKind::Repair,
1271        "http",
1272        request.tenant_id,
1273        request.namespace,
1274        http_principal(&headers),
1275        started_at_unix_ms,
1276        TraceStatus::Ok,
1277        None,
1278        OperationTraceSummary {
1279            dry_run: Some(request.dry_run),
1280            ..OperationTraceSummary::default()
1281        },
1282        None,
1283        correlation_id,
1284    );
1285    Ok(Json(report))
1286}
1287
1288async fn compact_http<S>(
1289    store: Arc<S>,
1290    request: CompactionRequest,
1291    headers: HeaderMap,
1292    runtime: ServerRuntime,
1293) -> Result<Json<CompactionReport>, (StatusCode, Json<HttpErrorBody>)>
1294where
1295    S: MemoryStore + 'static,
1296{
1297    runtime
1298        .metrics()
1299        .http_compact
1300        .fetch_add(1, Ordering::Relaxed);
1301    let started_at_unix_ms = now_unix_ms();
1302    let correlation_id = runtime.traces().next_id("corr");
1303    let _permit = runtime
1304        .admission()
1305        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1306        .await
1307        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1308    let report = store
1309        .compact(request.clone())
1310        .await
1311        .map_err(map_http_store_error)?;
1312    record_trace(
1313        &runtime,
1314        TraceOperationKind::Compact,
1315        "http",
1316        Some(request.tenant_id),
1317        request.namespace,
1318        http_principal(&headers),
1319        started_at_unix_ms,
1320        TraceStatus::Ok,
1321        None,
1322        OperationTraceSummary {
1323            dry_run: Some(request.dry_run),
1324            ..OperationTraceSummary::default()
1325        },
1326        None,
1327        correlation_id,
1328    );
1329    Ok(Json(report))
1330}
1331
1332async fn delete_http<S>(
1333    store: Arc<S>,
1334    request: DeleteHttpRequest,
1335    headers: HeaderMap,
1336    runtime: ServerRuntime,
1337) -> Result<Json<DeleteReceipt>, (StatusCode, Json<HttpErrorBody>)>
1338where
1339    S: MemoryStore + 'static,
1340{
1341    runtime
1342        .metrics()
1343        .http_delete
1344        .fetch_add(1, Ordering::Relaxed);
1345    let started_at_unix_ms = now_unix_ms();
1346    let correlation_id = runtime.traces().next_id("corr");
1347    let _permit = runtime
1348        .admission()
1349        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1350        .await
1351        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1352    let receipt = store
1353        .delete(DeleteRequest {
1354            tenant_id: request.tenant_id.clone(),
1355            namespace: request.namespace.clone(),
1356            record_id: request.record_id,
1357            hard_delete: request.hard_delete,
1358            audit_reason: request.audit_reason,
1359        })
1360        .await
1361        .map_err(map_http_store_error)?;
1362    record_trace(
1363        &runtime,
1364        TraceOperationKind::Delete,
1365        "http",
1366        Some(request.tenant_id),
1367        Some(request.namespace),
1368        http_principal(&headers),
1369        started_at_unix_ms,
1370        TraceStatus::Ok,
1371        None,
1372        OperationTraceSummary {
1373            record_id: Some(receipt.record_id.clone()),
1374            ..OperationTraceSummary::default()
1375        },
1376        None,
1377        correlation_id,
1378    );
1379    Ok(Json(receipt))
1380}
1381
1382async fn archive_http<S>(
1383    store: Arc<S>,
1384    request: ArchiveHttpRequest,
1385    headers: HeaderMap,
1386    runtime: ServerRuntime,
1387) -> Result<Json<ArchiveReceipt>, (StatusCode, Json<HttpErrorBody>)>
1388where
1389    S: MemoryStore + 'static,
1390{
1391    runtime
1392        .metrics()
1393        .http_archive
1394        .fetch_add(1, Ordering::Relaxed);
1395    let started_at_unix_ms = now_unix_ms();
1396    let correlation_id = runtime.traces().next_id("corr");
1397    let _permit = runtime
1398        .admission()
1399        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1400        .await
1401        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1402    let receipt = store
1403        .archive(ArchiveRequest {
1404            tenant_id: request.tenant_id.clone(),
1405            namespace: request.namespace.clone(),
1406            record_id: request.record_id,
1407            dry_run: request.dry_run,
1408            audit_reason: request.audit_reason,
1409        })
1410        .await
1411        .map_err(map_http_store_error)?;
1412    record_trace(
1413        &runtime,
1414        TraceOperationKind::Archive,
1415        "http",
1416        Some(request.tenant_id),
1417        Some(request.namespace),
1418        http_principal(&headers),
1419        started_at_unix_ms,
1420        TraceStatus::Ok,
1421        None,
1422        OperationTraceSummary {
1423            record_id: Some(receipt.record_id.clone()),
1424            dry_run: Some(receipt.dry_run),
1425            ..OperationTraceSummary::default()
1426        },
1427        None,
1428        correlation_id,
1429    );
1430    Ok(Json(receipt))
1431}
1432
1433async fn suppress_http<S>(
1434    store: Arc<S>,
1435    request: SuppressHttpRequest,
1436    headers: HeaderMap,
1437    runtime: ServerRuntime,
1438) -> Result<Json<SuppressReceipt>, (StatusCode, Json<HttpErrorBody>)>
1439where
1440    S: MemoryStore + 'static,
1441{
1442    runtime
1443        .metrics()
1444        .http_suppress
1445        .fetch_add(1, Ordering::Relaxed);
1446    let started_at_unix_ms = now_unix_ms();
1447    let correlation_id = runtime.traces().next_id("corr");
1448    let _permit = runtime
1449        .admission()
1450        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1451        .await
1452        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1453    let receipt = store
1454        .suppress(SuppressRequest {
1455            tenant_id: request.tenant_id.clone(),
1456            namespace: request.namespace.clone(),
1457            record_id: request.record_id,
1458            dry_run: request.dry_run,
1459            audit_reason: request.audit_reason,
1460        })
1461        .await
1462        .map_err(map_http_store_error)?;
1463    record_trace(
1464        &runtime,
1465        TraceOperationKind::Suppress,
1466        "http",
1467        Some(request.tenant_id),
1468        Some(request.namespace),
1469        http_principal(&headers),
1470        started_at_unix_ms,
1471        TraceStatus::Ok,
1472        None,
1473        OperationTraceSummary {
1474            record_id: Some(receipt.record_id.clone()),
1475            dry_run: Some(receipt.dry_run),
1476            ..OperationTraceSummary::default()
1477        },
1478        None,
1479        correlation_id,
1480    );
1481    Ok(Json(receipt))
1482}
1483
1484async fn recover_http<S>(
1485    store: Arc<S>,
1486    request: RecoverHttpRequest,
1487    headers: HeaderMap,
1488    runtime: ServerRuntime,
1489) -> Result<Json<RecoverReceipt>, (StatusCode, Json<HttpErrorBody>)>
1490where
1491    S: MemoryStore + 'static,
1492{
1493    runtime
1494        .metrics()
1495        .http_recover
1496        .fetch_add(1, Ordering::Relaxed);
1497    let started_at_unix_ms = now_unix_ms();
1498    let correlation_id = runtime.traces().next_id("corr");
1499    let _permit = runtime
1500        .admission()
1501        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1502        .await
1503        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1504    let receipt = store
1505        .recover(RecoverRequest {
1506            tenant_id: request.tenant_id.clone(),
1507            namespace: request.namespace.clone(),
1508            record_id: request.record_id,
1509            dry_run: request.dry_run,
1510            audit_reason: request.audit_reason,
1511            quality_state: request.quality_state,
1512            historical_state: request.historical_state,
1513        })
1514        .await
1515        .map_err(map_http_store_error)?;
1516    record_trace(
1517        &runtime,
1518        TraceOperationKind::Recover,
1519        "http",
1520        Some(request.tenant_id),
1521        Some(request.namespace),
1522        http_principal(&headers),
1523        started_at_unix_ms,
1524        TraceStatus::Ok,
1525        None,
1526        OperationTraceSummary {
1527            record_id: Some(receipt.record_id.clone()),
1528            dry_run: Some(receipt.dry_run),
1529            ..OperationTraceSummary::default()
1530        },
1531        None,
1532        correlation_id,
1533    );
1534    Ok(Json(receipt))
1535}
1536
1537async fn traces_http(
1538    request: TraceListRequest,
1539    runtime: ServerRuntime,
1540) -> Result<Json<Vec<OperationTrace>>, (StatusCode, Json<HttpErrorBody>)> {
1541    runtime
1542        .metrics()
1543        .http_traces
1544        .fetch_add(1, Ordering::Relaxed);
1545    runtime
1546        .metrics()
1547        .trace_reads
1548        .fetch_add(1, Ordering::Relaxed);
1549    Ok(Json(runtime.traces().list(&request)))
1550}
1551
1552async fn trace_http(
1553    trace_id: String,
1554    runtime: ServerRuntime,
1555) -> Result<Json<OperationTrace>, (StatusCode, Json<HttpErrorBody>)> {
1556    runtime
1557        .metrics()
1558        .http_traces
1559        .fetch_add(1, Ordering::Relaxed);
1560    runtime
1561        .metrics()
1562        .trace_reads
1563        .fetch_add(1, Ordering::Relaxed);
1564    runtime.traces().get(&trace_id).map(Json).ok_or_else(|| {
1565        (
1566            StatusCode::NOT_FOUND,
1567            Json(HttpErrorBody {
1568                error: format!("trace {trace_id} not found"),
1569            }),
1570        )
1571    })
1572}
1573
1574async fn runtime_status_http(runtime: ServerRuntime) -> Json<ServerRuntimeStatus> {
1575    runtime
1576        .metrics()
1577        .http_runtime
1578        .fetch_add(1, Ordering::Relaxed);
1579    Json(ServerRuntimeStatus {
1580        backend: runtime.backend().as_ref().clone(),
1581        admission: runtime.admission().snapshot(),
1582        traces: runtime.traces().snapshot(),
1583    })
1584}
1585
1586async fn export_http<S>(
1587    store: Arc<S>,
1588    request: ExportRequest,
1589    headers: HeaderMap,
1590    runtime: ServerRuntime,
1591) -> Result<Json<PortableStorePackage>, (StatusCode, Json<HttpErrorBody>)>
1592where
1593    S: MemoryStore + 'static,
1594{
1595    runtime
1596        .metrics()
1597        .http_export
1598        .fetch_add(1, Ordering::Relaxed);
1599    let started_at_unix_ms = now_unix_ms();
1600    let correlation_id = runtime.traces().next_id("corr");
1601    let _permit = runtime
1602        .admission()
1603        .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1604        .await
1605        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1606    let package = store
1607        .export(request.clone())
1608        .await
1609        .map_err(map_http_store_error)?;
1610    record_trace(
1611        &runtime,
1612        TraceOperationKind::Export,
1613        "http",
1614        request.tenant_id,
1615        request.namespace,
1616        http_principal(&headers),
1617        started_at_unix_ms,
1618        TraceStatus::Ok,
1619        None,
1620        OperationTraceSummary::default(),
1621        None,
1622        correlation_id,
1623    );
1624    Ok(Json(package))
1625}
1626
1627async fn import_http<S>(
1628    store: Arc<S>,
1629    request: ImportRequest,
1630    headers: HeaderMap,
1631    runtime: ServerRuntime,
1632) -> Result<Json<ImportReport>, (StatusCode, Json<HttpErrorBody>)>
1633where
1634    S: MemoryStore + 'static,
1635{
1636    runtime
1637        .metrics()
1638        .http_import
1639        .fetch_add(1, Ordering::Relaxed);
1640    let started_at_unix_ms = now_unix_ms();
1641    let correlation_id = runtime.traces().next_id("corr");
1642    let tenant_id = request
1643        .package
1644        .records
1645        .first()
1646        .map(|entry| entry.record.scope.tenant_id.clone());
1647    let namespace = request
1648        .package
1649        .records
1650        .first()
1651        .map(|entry| entry.record.scope.namespace.clone());
1652    let _permit = runtime
1653        .admission()
1654        .acquire(AdmissionClass::Admin, tenant_id.as_deref())
1655        .await
1656        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1657    let report = store
1658        .import(request.clone())
1659        .await
1660        .map_err(map_http_store_error)?;
1661    record_trace(
1662        &runtime,
1663        TraceOperationKind::Import,
1664        "http",
1665        tenant_id,
1666        namespace,
1667        http_principal(&headers),
1668        started_at_unix_ms,
1669        if report.failed_records.is_empty() {
1670            TraceStatus::Ok
1671        } else {
1672            TraceStatus::Error
1673        },
1674        (!report.failed_records.is_empty())
1675            .then(|| format!("{} import validation failures", report.failed_records.len())),
1676        OperationTraceSummary {
1677            request_count: Some(request.package.records.len() as u32),
1678            dry_run: Some(request.dry_run),
1679            ..OperationTraceSummary::default()
1680        },
1681        None,
1682        correlation_id,
1683    );
1684    Ok(Json(report))
1685}
1686
1687async fn metrics_http(metrics: Arc<ServerMetrics>) -> impl IntoResponse {
1688    metrics.http_metrics.fetch_add(1, Ordering::Relaxed);
1689    (
1690        [(
1691            axum::http::header::CONTENT_TYPE,
1692            "text/plain; version=0.0.4",
1693        )],
1694        metrics.render(),
1695    )
1696}
1697
1698async fn enforce_http_guardrails(
1699    request: AxumRequest,
1700    next: Next,
1701    limits: Arc<ServerLimits>,
1702    metrics: Arc<ServerMetrics>,
1703    auth: Arc<AuthConfig>,
1704) -> Result<axum::response::Response, StatusCode> {
1705    let path = request.uri().path().to_string();
1706    if let Some(content_length) = request.headers().get(axum::http::header::CONTENT_LENGTH)
1707        && let Ok(content_length) = content_length.to_str()
1708        && let Ok(content_length) = content_length.parse::<usize>()
1709        && content_length > limits.max_http_body_bytes
1710    {
1711        metrics
1712            .http_rejected_body_too_large
1713            .fetch_add(1, Ordering::Relaxed);
1714        return Err(StatusCode::PAYLOAD_TOO_LARGE);
1715    }
1716    validate_http_auth(&request, &path, auth.as_ref())?;
1717    Ok(next.run(request).await)
1718}
1719
1720fn http_principal(headers: &HeaderMap) -> Option<String> {
1721    headers
1722        .get(axum::http::header::AUTHORIZATION)
1723        .and_then(|value| value.to_str().ok())
1724        .map(|_| "bearer".to_string())
1725}
1726
1727fn grpc_principal<T>(request: &Request<T>) -> Option<String> {
1728    request
1729        .metadata()
1730        .get("authorization")
1731        .and_then(|value| value.to_str().ok())
1732        .map(|_| "bearer".to_string())
1733}
1734
1735fn map_http_admission_error(
1736    metrics: &ServerMetrics,
1737    error: AdmissionError,
1738) -> (StatusCode, Json<HttpErrorBody>) {
1739    match error {
1740        AdmissionError::QueueFull | AdmissionError::TenantBusy => {
1741            metrics.admission_rejected.fetch_add(1, Ordering::Relaxed);
1742            (
1743                StatusCode::TOO_MANY_REQUESTS,
1744                Json(HttpErrorBody {
1745                    error: "request admission rejected".to_string(),
1746                }),
1747            )
1748        }
1749        AdmissionError::TimedOut => {
1750            metrics.admission_timed_out.fetch_add(1, Ordering::Relaxed);
1751            (
1752                StatusCode::TOO_MANY_REQUESTS,
1753                Json(HttpErrorBody {
1754                    error: "request admission timed out".to_string(),
1755                }),
1756            )
1757        }
1758    }
1759}
1760
1761fn map_grpc_admission_error(metrics: &ServerMetrics, error: AdmissionError) -> Status {
1762    match error {
1763        AdmissionError::QueueFull | AdmissionError::TenantBusy => {
1764            metrics.admission_rejected.fetch_add(1, Ordering::Relaxed);
1765            Status::resource_exhausted("request admission rejected")
1766        }
1767        AdmissionError::TimedOut => {
1768            metrics.admission_timed_out.fetch_add(1, Ordering::Relaxed);
1769            Status::resource_exhausted("request admission timed out")
1770        }
1771    }
1772}
1773
1774fn attach_correlation_id(result: &mut RecallResult, correlation_id: &str) {
1775    if let Some(explanation) = result.explanation.as_mut() {
1776        if let Some(existing) = explanation.trace_id.replace(correlation_id.to_string()) {
1777            explanation
1778                .policy_notes
1779                .push(format!("planning_trace_id={existing}"));
1780        }
1781        explanation
1782            .policy_notes
1783            .push(format!("correlation_id={correlation_id}"));
1784    }
1785    for hit in &mut result.hits {
1786        if let Some(explanation) = hit.explanation.as_mut() {
1787            if let Some(existing) = explanation.trace_id.replace(correlation_id.to_string()) {
1788                explanation
1789                    .policy_notes
1790                    .push(format!("planning_trace_id={existing}"));
1791            }
1792            explanation
1793                .policy_notes
1794                .push(format!("correlation_id={correlation_id}"));
1795        }
1796    }
1797}
1798
1799#[allow(clippy::too_many_arguments)]
1800fn record_trace(
1801    runtime: &ServerRuntime,
1802    operation: TraceOperationKind,
1803    transport: &str,
1804    tenant_id: Option<String>,
1805    namespace: Option<String>,
1806    principal: Option<String>,
1807    started_at_unix_ms: u64,
1808    status: TraceStatus,
1809    status_message: Option<String>,
1810    summary: OperationTraceSummary,
1811    recall_explanation: Option<RecallExplanation>,
1812    correlation_id: String,
1813) {
1814    let completed_at_unix_ms = now_unix_ms();
1815    let admission_class = admission_class_for_operation(operation.clone()).to_string();
1816    let planning_trace_id = recall_explanation.as_ref().and_then(|value| {
1817        value
1818            .planning_trace
1819            .as_ref()
1820            .map(|trace| trace.trace_id.clone())
1821    });
1822    let store_span_id = runtime.traces().next_id("store-span");
1823    let evicted = runtime.traces().record(OperationTrace {
1824        trace_id: runtime.traces().next_id("trace"),
1825        correlation_id,
1826        operation,
1827        transport: transport.to_string(),
1828        backend: Some(runtime.backend().as_ref().clone()),
1829        admission_class: Some(admission_class),
1830        tenant_id,
1831        namespace,
1832        principal,
1833        store_span_id: Some(store_span_id),
1834        planning_trace_id,
1835        started_at_unix_ms,
1836        completed_at_unix_ms,
1837        latency_ms: completed_at_unix_ms.saturating_sub(started_at_unix_ms),
1838        status,
1839        status_message,
1840        summary,
1841        recall_explanation,
1842    });
1843    runtime
1844        .metrics()
1845        .trace_records
1846        .fetch_add(1, Ordering::Relaxed);
1847    if evicted {
1848        runtime
1849            .metrics()
1850            .trace_evictions
1851            .fetch_add(1, Ordering::Relaxed);
1852    }
1853}
1854
1855fn admission_class_for_operation(operation: TraceOperationKind) -> &'static str {
1856    match operation {
1857        TraceOperationKind::Recall | TraceOperationKind::Snapshot | TraceOperationKind::Stats => {
1858            "read"
1859        }
1860        TraceOperationKind::Upsert
1861        | TraceOperationKind::BatchUpsert
1862        | TraceOperationKind::Compact
1863        | TraceOperationKind::Delete
1864        | TraceOperationKind::Archive
1865        | TraceOperationKind::Suppress
1866        | TraceOperationKind::Recover => "write",
1867        TraceOperationKind::IntegrityCheck
1868        | TraceOperationKind::Repair
1869        | TraceOperationKind::Export
1870        | TraceOperationKind::Import => "admin",
1871    }
1872}
1873
1874fn validate_http_auth(
1875    request: &AxumRequest,
1876    path: &str,
1877    auth: &AuthConfig,
1878) -> Result<(), StatusCode> {
1879    if path == "/healthz" || path == "/readyz" || (path == "/metrics" && !auth.protect_metrics) {
1880        return Ok(());
1881    }
1882    let provided = request
1883        .headers()
1884        .get(axum::http::header::AUTHORIZATION)
1885        .and_then(|value| value.to_str().ok());
1886    let permission = match path {
1887        "/metrics" => AuthPermission::Metrics,
1888        "/memory/recall" => AuthPermission::Read,
1889        "/memory/upsert" | "/memory/batch-upsert" => AuthPermission::Write,
1890        "/admin/stats" | "/admin/integrity" | "/admin/repair" | "/admin/snapshot"
1891        | "/admin/compact" | "/admin/delete" | "/admin/archive" | "/admin/suppress"
1892        | "/admin/recover" | "/admin/runtime" | "/admin/export" | "/admin/import" => {
1893            AuthPermission::Admin
1894        }
1895        _ => AuthPermission::Admin,
1896    };
1897    if path.starts_with("/admin/traces") {
1898        return auth
1899            .authorize(provided, AuthPermission::Admin)
1900            .then_some(())
1901            .ok_or(StatusCode::UNAUTHORIZED);
1902    }
1903    if auth.authorize(provided, permission) {
1904        Ok(())
1905    } else {
1906        Err(StatusCode::UNAUTHORIZED)
1907    }
1908}
1909
1910fn validate_grpc_auth<T>(
1911    request: &Request<T>,
1912    auth: &AuthConfig,
1913    permission: AuthPermission,
1914) -> Result<(), Status> {
1915    let provided = request
1916        .metadata()
1917        .get("authorization")
1918        .and_then(|value| value.to_str().ok());
1919    if auth.authorize(provided, permission) {
1920        Ok(())
1921    } else {
1922        Err(Status::unauthenticated("missing or invalid bearer token"))
1923    }
1924}
1925
1926fn bearer_token_from_header(header: Option<&str>) -> Option<&str> {
1927    header.and_then(|value| value.strip_prefix("Bearer "))
1928}
1929
1930fn invalid_argument(message: impl Into<String>) -> Status {
1931    Status::invalid_argument(message.into())
1932}
1933
1934fn internal_status(message: impl Into<String>) -> Status {
1935    Status::internal(message.into())
1936}
1937
1938fn trust_level_from_proto(value: &str) -> Result<MemoryTrustLevel, Status> {
1939    match value {
1940        "" | "derived" => Ok(MemoryTrustLevel::Derived),
1941        "untrusted" => Ok(MemoryTrustLevel::Untrusted),
1942        "observed" => Ok(MemoryTrustLevel::Observed),
1943        "verified" => Ok(MemoryTrustLevel::Verified),
1944        "pinned" => Ok(MemoryTrustLevel::Pinned),
1945        other => Err(invalid_argument(format!("unknown trust level: {other}"))),
1946    }
1947}
1948
1949fn trust_level_to_proto(value: MemoryTrustLevel) -> String {
1950    match value {
1951        MemoryTrustLevel::Untrusted => "untrusted",
1952        MemoryTrustLevel::Observed => "observed",
1953        MemoryTrustLevel::Derived => "derived",
1954        MemoryTrustLevel::Verified => "verified",
1955        MemoryTrustLevel::Pinned => "pinned",
1956    }
1957    .to_string()
1958}
1959
1960fn record_kind_from_proto(value: &str) -> Result<MemoryRecordKind, Status> {
1961    match value {
1962        "episodic" => Ok(MemoryRecordKind::Episodic),
1963        "summary" => Ok(MemoryRecordKind::Summary),
1964        "fact" => Ok(MemoryRecordKind::Fact),
1965        "preference" => Ok(MemoryRecordKind::Preference),
1966        "task" => Ok(MemoryRecordKind::Task),
1967        "artifact" => Ok(MemoryRecordKind::Artifact),
1968        "hypothesis" => Ok(MemoryRecordKind::Hypothesis),
1969        other => Err(invalid_argument(format!("unknown record kind: {other}"))),
1970    }
1971}
1972
1973fn record_kind_to_proto(value: MemoryRecordKind) -> String {
1974    match value {
1975        MemoryRecordKind::Episodic => "episodic",
1976        MemoryRecordKind::Summary => "summary",
1977        MemoryRecordKind::Fact => "fact",
1978        MemoryRecordKind::Preference => "preference",
1979        MemoryRecordKind::Task => "task",
1980        MemoryRecordKind::Artifact => "artifact",
1981        MemoryRecordKind::Hypothesis => "hypothesis",
1982    }
1983    .to_string()
1984}
1985
1986fn quality_state_from_proto(value: &str) -> Result<MemoryQualityState, Status> {
1987    match value {
1988        "draft" => Ok(MemoryQualityState::Draft),
1989        "active" => Ok(MemoryQualityState::Active),
1990        "verified" => Ok(MemoryQualityState::Verified),
1991        "archived" => Ok(MemoryQualityState::Archived),
1992        "suppressed" => Ok(MemoryQualityState::Suppressed),
1993        "deleted" => Ok(MemoryQualityState::Deleted),
1994        other => Err(invalid_argument(format!("unknown quality state: {other}"))),
1995    }
1996}
1997
1998fn quality_state_to_proto(value: MemoryQualityState) -> String {
1999    match value {
2000        MemoryQualityState::Draft => "draft",
2001        MemoryQualityState::Active => "active",
2002        MemoryQualityState::Verified => "verified",
2003        MemoryQualityState::Archived => "archived",
2004        MemoryQualityState::Suppressed => "suppressed",
2005        MemoryQualityState::Deleted => "deleted",
2006    }
2007    .to_string()
2008}
2009
2010fn scope_from_proto(scope: ProtoMemoryScope) -> Result<MemoryScope, Status> {
2011    Ok(MemoryScope {
2012        tenant_id: scope.tenant_id,
2013        namespace: scope.namespace,
2014        actor_id: scope.actor_id,
2015        conversation_id: scope.conversation_id,
2016        session_id: scope.session_id,
2017        source: scope.source,
2018        labels: scope.labels,
2019        trust_level: trust_level_from_proto(&scope.trust_level)?,
2020    })
2021}
2022
2023fn scope_to_proto(scope: MemoryScope) -> ProtoMemoryScope {
2024    ProtoMemoryScope {
2025        tenant_id: scope.tenant_id,
2026        namespace: scope.namespace,
2027        actor_id: scope.actor_id,
2028        conversation_id: scope.conversation_id,
2029        session_id: scope.session_id,
2030        source: scope.source,
2031        labels: scope.labels,
2032        trust_level: trust_level_to_proto(scope.trust_level),
2033    }
2034}
2035
2036fn artifact_from_proto(value: ProtoArtifactPointer) -> ArtifactPointer {
2037    ArtifactPointer {
2038        uri: value.uri,
2039        media_type: value.media_type,
2040        checksum: value.checksum,
2041    }
2042}
2043
2044fn artifact_to_proto(value: ArtifactPointer) -> ProtoArtifactPointer {
2045    ProtoArtifactPointer {
2046        uri: value.uri,
2047        media_type: value.media_type,
2048        checksum: value.checksum,
2049    }
2050}
2051
2052fn continuity_state_from_proto(value: &str) -> Result<EpisodeContinuityState, Status> {
2053    match value {
2054        "" | "open" => Ok(EpisodeContinuityState::Open),
2055        "resolved" => Ok(EpisodeContinuityState::Resolved),
2056        "superseded" => Ok(EpisodeContinuityState::Superseded),
2057        "abandoned" => Ok(EpisodeContinuityState::Abandoned),
2058        other => Err(invalid_argument(format!(
2059            "unknown continuity state: {other}"
2060        ))),
2061    }
2062}
2063
2064fn continuity_state_to_proto(value: EpisodeContinuityState) -> String {
2065    match value {
2066        EpisodeContinuityState::Open => "open",
2067        EpisodeContinuityState::Resolved => "resolved",
2068        EpisodeContinuityState::Superseded => "superseded",
2069        EpisodeContinuityState::Abandoned => "abandoned",
2070    }
2071    .to_string()
2072}
2073
2074fn affective_provenance_from_proto(value: &str) -> Result<AffectiveAnnotationProvenance, Status> {
2075    match value {
2076        "" | "authored" => Ok(AffectiveAnnotationProvenance::Authored),
2077        "imported" => Ok(AffectiveAnnotationProvenance::Imported),
2078        "derived" => Ok(AffectiveAnnotationProvenance::Derived),
2079        other => Err(invalid_argument(format!(
2080            "unknown affective provenance: {other}"
2081        ))),
2082    }
2083}
2084
2085fn affective_provenance_to_proto(value: AffectiveAnnotationProvenance) -> String {
2086    match value {
2087        AffectiveAnnotationProvenance::Authored => "authored",
2088        AffectiveAnnotationProvenance::Imported => "imported",
2089        AffectiveAnnotationProvenance::Derived => "derived",
2090    }
2091    .to_string()
2092}
2093
2094fn temporal_order_from_proto(value: &str) -> Result<RecallTemporalOrder, Status> {
2095    match value {
2096        "" | "relevance" => Ok(RecallTemporalOrder::Relevance),
2097        "chronological_asc" => Ok(RecallTemporalOrder::ChronologicalAsc),
2098        "chronological_desc" => Ok(RecallTemporalOrder::ChronologicalDesc),
2099        other => Err(invalid_argument(format!("unknown temporal order: {other}"))),
2100    }
2101}
2102
2103fn planning_profile_from_proto(value: &str) -> Result<RecallPlanningProfile, Status> {
2104    match value {
2105        "" | "fast_path" => Ok(RecallPlanningProfile::FastPath),
2106        "continuity_aware" => Ok(RecallPlanningProfile::ContinuityAware),
2107        other => Err(invalid_argument(format!(
2108            "unknown planning profile: {other}"
2109        ))),
2110    }
2111}
2112
2113fn planning_profile_to_proto(value: RecallPlanningProfile) -> String {
2114    match value {
2115        RecallPlanningProfile::FastPath => "fast_path",
2116        RecallPlanningProfile::ContinuityAware => "continuity_aware",
2117    }
2118    .to_string()
2119}
2120
2121fn planner_stage_to_proto(value: RecallPlannerStage) -> String {
2122    match value {
2123        RecallPlannerStage::CandidateGeneration => "candidate_generation",
2124        RecallPlannerStage::GraphExpansion => "graph_expansion",
2125        RecallPlannerStage::Selection => "selection",
2126    }
2127    .to_string()
2128}
2129
2130fn candidate_source_to_proto(value: RecallCandidateSource) -> String {
2131    match value {
2132        RecallCandidateSource::Lexical => "lexical",
2133        RecallCandidateSource::Semantic => "semantic",
2134        RecallCandidateSource::Metadata => "metadata",
2135        RecallCandidateSource::Episode => "episode",
2136        RecallCandidateSource::Graph => "graph",
2137        RecallCandidateSource::Temporal => "temporal",
2138        RecallCandidateSource::Provenance => "provenance",
2139    }
2140    .to_string()
2141}
2142
2143fn affective_annotation_from_proto(
2144    value: ProtoAffectiveAnnotation,
2145) -> Result<AffectiveAnnotation, Status> {
2146    Ok(AffectiveAnnotation {
2147        tone: value.tone,
2148        sentiment: value.sentiment,
2149        urgency: value.urgency,
2150        confidence: value.confidence,
2151        tension: value.tension,
2152        provenance: affective_provenance_from_proto(&value.provenance)?,
2153    })
2154}
2155
2156fn affective_annotation_to_proto(value: AffectiveAnnotation) -> ProtoAffectiveAnnotation {
2157    ProtoAffectiveAnnotation {
2158        tone: value.tone,
2159        sentiment: value.sentiment,
2160        urgency: value.urgency,
2161        confidence: value.confidence,
2162        tension: value.tension,
2163        provenance: affective_provenance_to_proto(value.provenance),
2164    }
2165}
2166
2167fn historical_state_from_proto(value: &str) -> Result<MemoryHistoricalState, Status> {
2168    match value {
2169        "" | "current" => Ok(MemoryHistoricalState::Current),
2170        "historical" => Ok(MemoryHistoricalState::Historical),
2171        "superseded" => Ok(MemoryHistoricalState::Superseded),
2172        other => Err(invalid_argument(format!(
2173            "unknown historical state: {other}"
2174        ))),
2175    }
2176}
2177
2178fn historical_state_to_proto(value: MemoryHistoricalState) -> String {
2179    match value {
2180        MemoryHistoricalState::Current => "current",
2181        MemoryHistoricalState::Historical => "historical",
2182        MemoryHistoricalState::Superseded => "superseded",
2183    }
2184    .to_string()
2185}
2186
2187fn lineage_relation_from_proto(value: &str) -> Result<LineageRelationKind, Status> {
2188    match value {
2189        "" | "derived_from" => Ok(LineageRelationKind::DerivedFrom),
2190        "consolidated_from" => Ok(LineageRelationKind::ConsolidatedFrom),
2191        "supersedes" => Ok(LineageRelationKind::Supersedes),
2192        "superseded_by" => Ok(LineageRelationKind::SupersededBy),
2193        "conflicts_with" => Ok(LineageRelationKind::ConflictsWith),
2194        other => Err(invalid_argument(format!(
2195            "unknown lineage relation: {other}"
2196        ))),
2197    }
2198}
2199
2200fn lineage_relation_to_proto(value: LineageRelationKind) -> String {
2201    match value {
2202        LineageRelationKind::DerivedFrom => "derived_from",
2203        LineageRelationKind::ConsolidatedFrom => "consolidated_from",
2204        LineageRelationKind::Supersedes => "supersedes",
2205        LineageRelationKind::SupersededBy => "superseded_by",
2206        LineageRelationKind::ConflictsWith => "conflicts_with",
2207    }
2208    .to_string()
2209}
2210
2211fn lineage_link_from_proto(value: ProtoLineageLink) -> Result<LineageLink, Status> {
2212    Ok(LineageLink {
2213        record_id: value.record_id,
2214        relation: lineage_relation_from_proto(&value.relation)?,
2215        confidence: value.confidence,
2216    })
2217}
2218
2219fn lineage_link_to_proto(value: LineageLink) -> ProtoLineageLink {
2220    ProtoLineageLink {
2221        record_id: value.record_id,
2222        relation: lineage_relation_to_proto(value.relation),
2223        confidence: value.confidence,
2224    }
2225}
2226
2227fn historical_mode_from_proto(value: &str) -> Result<RecallHistoricalMode, Status> {
2228    match value {
2229        "" | "current_only" => Ok(RecallHistoricalMode::CurrentOnly),
2230        "include_historical" => Ok(RecallHistoricalMode::IncludeHistorical),
2231        "historical_only" => Ok(RecallHistoricalMode::HistoricalOnly),
2232        other => Err(invalid_argument(format!(
2233            "unknown historical mode: {other}"
2234        ))),
2235    }
2236}
2237
2238fn episode_context_from_proto(value: ProtoEpisodeContext) -> Result<EpisodeContext, Status> {
2239    Ok(EpisodeContext {
2240        schema_version: if value.schema_version == 0 {
2241            EPISODE_SCHEMA_VERSION
2242        } else {
2243            value.schema_version
2244        },
2245        episode_id: value.episode_id,
2246        summary: value.summary,
2247        continuity_state: continuity_state_from_proto(&value.continuity_state)?,
2248        actor_ids: value.actor_ids,
2249        goal: value.goal,
2250        outcome: value.outcome,
2251        started_at_unix_ms: value.started_at_unix_ms,
2252        ended_at_unix_ms: value.ended_at_unix_ms,
2253        last_active_unix_ms: value.last_active_unix_ms,
2254        recurrence_key: value.recurrence_key,
2255        recurrence_interval_ms: value.recurrence_interval_ms,
2256        boundary_label: value.boundary_label,
2257        previous_record_id: value.previous_record_id,
2258        next_record_id: value.next_record_id,
2259        causal_record_ids: value.causal_record_ids,
2260        related_record_ids: value.related_record_ids,
2261        linked_artifact_uris: value.linked_artifact_uris,
2262        salience: value
2263            .salience
2264            .map_or_else(EpisodeSalience::default, |salience| EpisodeSalience {
2265                reuse_count: salience.reuse_count,
2266                novelty_score: salience.novelty_score,
2267                goal_relevance: salience.goal_relevance,
2268                unresolved_weight: salience.unresolved_weight,
2269            }),
2270        affective: value
2271            .affective
2272            .map(affective_annotation_from_proto)
2273            .transpose()?,
2274    })
2275}
2276
2277fn episode_context_to_proto(value: EpisodeContext) -> ProtoEpisodeContext {
2278    ProtoEpisodeContext {
2279        schema_version: value.schema_version,
2280        episode_id: value.episode_id,
2281        summary: value.summary,
2282        continuity_state: continuity_state_to_proto(value.continuity_state),
2283        actor_ids: value.actor_ids,
2284        goal: value.goal,
2285        outcome: value.outcome,
2286        started_at_unix_ms: value.started_at_unix_ms,
2287        ended_at_unix_ms: value.ended_at_unix_ms,
2288        last_active_unix_ms: value.last_active_unix_ms,
2289        recurrence_key: value.recurrence_key,
2290        recurrence_interval_ms: value.recurrence_interval_ms,
2291        boundary_label: value.boundary_label,
2292        previous_record_id: value.previous_record_id,
2293        next_record_id: value.next_record_id,
2294        causal_record_ids: value.causal_record_ids,
2295        related_record_ids: value.related_record_ids,
2296        linked_artifact_uris: value.linked_artifact_uris,
2297        salience: Some(ProtoEpisodeSalience {
2298            reuse_count: value.salience.reuse_count,
2299            novelty_score: value.salience.novelty_score,
2300            goal_relevance: value.salience.goal_relevance,
2301            unresolved_weight: value.salience.unresolved_weight,
2302        }),
2303        affective: value.affective.map(affective_annotation_to_proto),
2304    }
2305}
2306
2307fn record_from_proto(record: ProtoMemoryRecord) -> Result<MemoryRecord, Status> {
2308    Ok(MemoryRecord {
2309        id: record.id,
2310        scope: scope_from_proto(
2311            record
2312                .scope
2313                .ok_or_else(|| invalid_argument("memory record scope is required"))?,
2314        )?,
2315        kind: record_kind_from_proto(&record.kind)?,
2316        content: record.content,
2317        summary: record.summary,
2318        source_id: record.source_id,
2319        metadata: record.metadata.into_iter().collect(),
2320        quality_state: quality_state_from_proto(&record.quality_state)?,
2321        created_at_unix_ms: record.created_at_unix_ms,
2322        updated_at_unix_ms: record.updated_at_unix_ms,
2323        expires_at_unix_ms: record.expires_at_unix_ms,
2324        importance_score: record.importance_score,
2325        artifact: record.artifact.map(artifact_from_proto),
2326        episode: record.episode.map(episode_context_from_proto).transpose()?,
2327        historical_state: historical_state_from_proto(
2328            record.historical_state.as_deref().unwrap_or(""),
2329        )?,
2330        lineage: record
2331            .lineage
2332            .into_iter()
2333            .map(lineage_link_from_proto)
2334            .collect::<Result<Vec<_>, _>>()?,
2335    })
2336}
2337
2338fn record_to_proto(record: MemoryRecord) -> ProtoMemoryRecord {
2339    ProtoMemoryRecord {
2340        id: record.id,
2341        scope: Some(scope_to_proto(record.scope)),
2342        kind: record_kind_to_proto(record.kind),
2343        content: record.content,
2344        summary: record.summary,
2345        metadata: record.metadata.into_iter().collect(),
2346        quality_state: quality_state_to_proto(record.quality_state),
2347        created_at_unix_ms: record.created_at_unix_ms,
2348        updated_at_unix_ms: record.updated_at_unix_ms,
2349        expires_at_unix_ms: record.expires_at_unix_ms,
2350        importance_score: record.importance_score,
2351        source_id: record.source_id,
2352        artifact: record.artifact.map(artifact_to_proto),
2353        episode: record.episode.map(episode_context_to_proto),
2354        historical_state: Some(historical_state_to_proto(record.historical_state)),
2355        lineage: record
2356            .lineage
2357            .into_iter()
2358            .map(lineage_link_to_proto)
2359            .collect(),
2360    }
2361}
2362
2363fn recall_explanation_to_proto(value: RecallExplanation) -> ProtoRecallExplanation {
2364    ProtoRecallExplanation {
2365        selected_channels: value.selected_channels,
2366        policy_notes: value.policy_notes,
2367        trace_id: value.trace_id,
2368        planning_trace: value.planning_trace.map(recall_planning_trace_to_proto),
2369        scorer_kind: value
2370            .scorer_kind
2371            .map(recall_scorer_kind_to_proto)
2372            .unwrap_or(ProtoRecallScorerKind::Unspecified as i32),
2373        scoring_profile: value
2374            .scoring_profile
2375            .map(recall_scoring_profile_to_proto)
2376            .unwrap_or(ProtoRecallScoringProfile::Unspecified as i32),
2377        planning_profile: value.planning_profile.map(planning_profile_to_proto),
2378        policy_profile: value
2379            .policy_profile
2380            .map(recall_policy_profile_to_proto)
2381            .unwrap_or(ProtoRecallPolicyProfile::Unspecified as i32),
2382    }
2383}
2384
2385fn recall_scorer_kind_to_proto(value: RecallScorerKind) -> i32 {
2386    match value {
2387        RecallScorerKind::Profile => ProtoRecallScorerKind::Profile as i32,
2388        RecallScorerKind::Curated => ProtoRecallScorerKind::Curated as i32,
2389    }
2390}
2391
2392fn recall_scoring_profile_to_proto(value: RecallScoringProfile) -> i32 {
2393    match value {
2394        RecallScoringProfile::Balanced => ProtoRecallScoringProfile::Balanced as i32,
2395        RecallScoringProfile::LexicalFirst => ProtoRecallScoringProfile::LexicalFirst as i32,
2396        RecallScoringProfile::ImportanceFirst => ProtoRecallScoringProfile::ImportanceFirst as i32,
2397    }
2398}
2399
2400fn recall_policy_profile_to_proto(value: RecallPolicyProfile) -> i32 {
2401    match value {
2402        RecallPolicyProfile::General => ProtoRecallPolicyProfile::General as i32,
2403        RecallPolicyProfile::Support => ProtoRecallPolicyProfile::Support as i32,
2404        RecallPolicyProfile::Research => ProtoRecallPolicyProfile::Research as i32,
2405        RecallPolicyProfile::Assistant => ProtoRecallPolicyProfile::Assistant as i32,
2406        RecallPolicyProfile::AutonomousAgent => ProtoRecallPolicyProfile::AutonomousAgent as i32,
2407    }
2408}
2409
2410fn embedding_provider_kind_to_proto(value: EmbeddingProviderKind) -> i32 {
2411    match value {
2412        EmbeddingProviderKind::Disabled => ProtoEmbeddingProviderKind::Disabled as i32,
2413        EmbeddingProviderKind::DeterministicLocal => {
2414            ProtoEmbeddingProviderKind::DeterministicLocal as i32
2415        }
2416    }
2417}
2418
2419fn engine_tuning_info_to_proto(value: EngineTuningInfo) -> ProtoEngineTuningInfo {
2420    ProtoEngineTuningInfo {
2421        recall_scorer_kind: recall_scorer_kind_to_proto(value.recall_scorer_kind),
2422        recall_scoring_profile: recall_scoring_profile_to_proto(value.recall_scoring_profile),
2423        embedding_provider_kind: embedding_provider_kind_to_proto(value.embedding_provider_kind),
2424        embedding_dimensions: value.embedding_dimensions as u64,
2425        graph_expansion_max_hops: u32::from(value.graph_expansion_max_hops),
2426        compaction_summarize_after_record_count: value.compaction_summarize_after_record_count
2427            as u64,
2428        compaction_cold_archive_after_days: value.compaction_cold_archive_after_days,
2429        compaction_cold_archive_importance_threshold_per_mille: u32::from(
2430            value.compaction_cold_archive_importance_threshold_per_mille,
2431        ),
2432        recall_planning_profile: Some(planning_profile_to_proto(value.recall_planning_profile)),
2433        recall_policy_profile: recall_policy_profile_to_proto(value.recall_policy_profile),
2434    }
2435}
2436
2437fn engine_tuning_info_from_proto(value: ProtoEngineTuningInfo) -> Result<EngineTuningInfo, Status> {
2438    Ok(EngineTuningInfo {
2439        recall_scorer_kind: match ProtoRecallScorerKind::try_from(value.recall_scorer_kind)
2440            .unwrap_or(ProtoRecallScorerKind::Unspecified)
2441        {
2442            ProtoRecallScorerKind::Profile => RecallScorerKind::Profile,
2443            ProtoRecallScorerKind::Curated => RecallScorerKind::Curated,
2444            ProtoRecallScorerKind::Unspecified => {
2445                return Err(invalid_argument("engine recall scorer kind is required"));
2446            }
2447        },
2448        recall_scoring_profile: match ProtoRecallScoringProfile::try_from(
2449            value.recall_scoring_profile,
2450        )
2451        .unwrap_or(ProtoRecallScoringProfile::Unspecified)
2452        {
2453            ProtoRecallScoringProfile::Balanced => RecallScoringProfile::Balanced,
2454            ProtoRecallScoringProfile::LexicalFirst => RecallScoringProfile::LexicalFirst,
2455            ProtoRecallScoringProfile::ImportanceFirst => RecallScoringProfile::ImportanceFirst,
2456            ProtoRecallScoringProfile::Unspecified => {
2457                return Err(invalid_argument(
2458                    "engine recall scoring profile is required",
2459                ));
2460            }
2461        },
2462        recall_planning_profile: planning_profile_from_proto(
2463            value.recall_planning_profile.as_deref().unwrap_or(""),
2464        )?,
2465        recall_policy_profile: match ProtoRecallPolicyProfile::try_from(value.recall_policy_profile)
2466            .unwrap_or(ProtoRecallPolicyProfile::Unspecified)
2467        {
2468            ProtoRecallPolicyProfile::General => RecallPolicyProfile::General,
2469            ProtoRecallPolicyProfile::Support => RecallPolicyProfile::Support,
2470            ProtoRecallPolicyProfile::Research => RecallPolicyProfile::Research,
2471            ProtoRecallPolicyProfile::Assistant => RecallPolicyProfile::Assistant,
2472            ProtoRecallPolicyProfile::AutonomousAgent => RecallPolicyProfile::AutonomousAgent,
2473            ProtoRecallPolicyProfile::Unspecified => RecallPolicyProfile::General,
2474        },
2475        embedding_provider_kind: match ProtoEmbeddingProviderKind::try_from(
2476            value.embedding_provider_kind,
2477        )
2478        .unwrap_or(ProtoEmbeddingProviderKind::Unspecified)
2479        {
2480            ProtoEmbeddingProviderKind::Disabled => EmbeddingProviderKind::Disabled,
2481            ProtoEmbeddingProviderKind::DeterministicLocal => {
2482                EmbeddingProviderKind::DeterministicLocal
2483            }
2484            ProtoEmbeddingProviderKind::Unspecified => {
2485                return Err(invalid_argument(
2486                    "engine embedding provider kind is required",
2487                ));
2488            }
2489        },
2490        embedding_dimensions: value.embedding_dimensions as usize,
2491        graph_expansion_max_hops: value.graph_expansion_max_hops as u8,
2492        compaction_summarize_after_record_count: value.compaction_summarize_after_record_count
2493            as usize,
2494        compaction_cold_archive_after_days: value.compaction_cold_archive_after_days,
2495        compaction_cold_archive_importance_threshold_per_mille: value
2496            .compaction_cold_archive_importance_threshold_per_mille
2497            as u16,
2498    })
2499}
2500
2501fn recall_planning_trace_to_proto(value: RecallPlanningTrace) -> ProtoRecallPlanningTrace {
2502    ProtoRecallPlanningTrace {
2503        trace_id: value.trace_id,
2504        token_budget_applied: value.token_budget_applied,
2505        candidates: value
2506            .candidates
2507            .into_iter()
2508            .map(recall_trace_candidate_to_proto)
2509            .collect(),
2510    }
2511}
2512
2513fn recall_trace_candidate_to_proto(value: RecallTraceCandidate) -> ProtoRecallTraceCandidate {
2514    ProtoRecallTraceCandidate {
2515        record_id: value.record_id,
2516        kind: record_kind_to_proto(value.kind),
2517        selected: value.selected,
2518        matched_terms: value.matched_terms,
2519        decision_reason: value.decision_reason,
2520        breakdown: Some(recall_score_breakdown_to_proto(value.breakdown)),
2521        selection_rank: value.selection_rank,
2522        selected_channels: value.selected_channels,
2523        filter_reasons: value.filter_reasons,
2524        candidate_sources: value
2525            .candidate_sources
2526            .into_iter()
2527            .map(candidate_source_to_proto)
2528            .collect(),
2529        planner_stage: Some(planner_stage_to_proto(value.planner_stage)),
2530    }
2531}
2532
2533fn recall_score_breakdown_to_proto(
2534    value: mnemara_core::RecallScoreBreakdown,
2535) -> ProtoRecallScoreBreakdown {
2536    ProtoRecallScoreBreakdown {
2537        lexical: value.lexical,
2538        semantic: value.semantic,
2539        graph: value.graph,
2540        temporal: value.temporal,
2541        policy: value.policy,
2542        total: value.total,
2543        metadata: value.metadata,
2544        curation: value.curation,
2545        episodic: value.episodic,
2546        salience: value.salience,
2547    }
2548}
2549
2550fn namespace_stats_to_proto(value: NamespaceStats) -> ProtoNamespaceStats {
2551    ProtoNamespaceStats {
2552        tenant_id: value.tenant_id,
2553        namespace: value.namespace,
2554        active_records: value.active_records,
2555        archived_records: value.archived_records,
2556        deleted_records: value.deleted_records,
2557        suppressed_records: value.suppressed_records,
2558        pinned_records: value.pinned_records,
2559    }
2560}
2561
2562fn maintenance_stats_to_proto(value: MaintenanceStats) -> ProtoMaintenanceStats {
2563    ProtoMaintenanceStats {
2564        duplicate_candidate_groups: value.duplicate_candidate_groups,
2565        duplicate_candidate_records: value.duplicate_candidate_records,
2566        tombstoned_records: value.tombstoned_records,
2567        expired_records: value.expired_records,
2568        stale_idempotency_keys: value.stale_idempotency_keys,
2569        historical_records: value.historical_records,
2570        superseded_records: value.superseded_records,
2571        lineage_links: value.lineage_links,
2572    }
2573}
2574
2575fn portable_record_to_proto(value: PortableRecord) -> ProtoPortableRecord {
2576    ProtoPortableRecord {
2577        record: Some(record_to_proto(value.record)),
2578        idempotency_key: value.idempotency_key,
2579    }
2580}
2581
2582fn snapshot_manifest_to_proto(value: SnapshotManifest) -> SnapshotReply {
2583    SnapshotReply {
2584        snapshot_id: value.snapshot_id,
2585        created_at_unix_ms: value.created_at_unix_ms,
2586        namespaces: value.namespaces,
2587        record_count: value.record_count,
2588        storage_bytes: value.storage_bytes,
2589        engine: Some(engine_tuning_info_to_proto(value.engine)),
2590    }
2591}
2592
2593fn portable_package_to_proto(value: PortableStorePackage) -> ProtoExportReply {
2594    ProtoExportReply {
2595        package_version: value.package_version,
2596        exported_at_unix_ms: value.exported_at_unix_ms,
2597        manifest: Some(snapshot_manifest_to_proto(value.manifest)),
2598        records: value
2599            .records
2600            .into_iter()
2601            .map(portable_record_to_proto)
2602            .collect(),
2603    }
2604}
2605
2606fn import_mode_from_proto(value: i32) -> Result<ImportMode, Status> {
2607    match ProtoImportMode::try_from(value).unwrap_or(ProtoImportMode::Unspecified) {
2608        ProtoImportMode::Validate => Ok(ImportMode::Validate),
2609        ProtoImportMode::Merge => Ok(ImportMode::Merge),
2610        ProtoImportMode::Replace => Ok(ImportMode::Replace),
2611        ProtoImportMode::Unspecified => Err(invalid_argument("import mode is required")),
2612    }
2613}
2614
2615fn portable_record_from_proto(value: ProtoPortableRecord) -> Result<PortableRecord, Status> {
2616    Ok(PortableRecord {
2617        record: record_from_proto(
2618            value
2619                .record
2620                .ok_or_else(|| invalid_argument("portable record payload is required"))?,
2621        )?,
2622        idempotency_key: value.idempotency_key,
2623    })
2624}
2625
2626fn snapshot_manifest_from_proto(value: SnapshotReply) -> Result<SnapshotManifest, Status> {
2627    Ok(SnapshotManifest {
2628        snapshot_id: value.snapshot_id,
2629        created_at_unix_ms: value.created_at_unix_ms,
2630        namespaces: value.namespaces,
2631        record_count: value.record_count,
2632        storage_bytes: value.storage_bytes,
2633        engine: value
2634            .engine
2635            .map(engine_tuning_info_from_proto)
2636            .transpose()?
2637            .ok_or_else(|| invalid_argument("snapshot engine tuning info is required"))?,
2638    })
2639}
2640
2641fn portable_package_from_proto(value: ProtoExportReply) -> Result<PortableStorePackage, Status> {
2642    Ok(PortableStorePackage {
2643        package_version: value.package_version,
2644        exported_at_unix_ms: value.exported_at_unix_ms,
2645        manifest: snapshot_manifest_from_proto(
2646            value
2647                .manifest
2648                .ok_or_else(|| invalid_argument("portable export manifest is required"))?,
2649        )?,
2650        records: value
2651            .records
2652            .into_iter()
2653            .map(portable_record_from_proto)
2654            .collect::<Result<Vec<_>, _>>()?,
2655    })
2656}
2657
2658fn trace_operation_kind_to_proto(value: TraceOperationKind) -> i32 {
2659    match value {
2660        TraceOperationKind::Upsert => ProtoTraceOperationKind::Upsert as i32,
2661        TraceOperationKind::BatchUpsert => ProtoTraceOperationKind::BatchUpsert as i32,
2662        TraceOperationKind::Recall => ProtoTraceOperationKind::Recall as i32,
2663        TraceOperationKind::Snapshot => ProtoTraceOperationKind::Snapshot as i32,
2664        TraceOperationKind::Stats => ProtoTraceOperationKind::Stats as i32,
2665        TraceOperationKind::IntegrityCheck => ProtoTraceOperationKind::IntegrityCheck as i32,
2666        TraceOperationKind::Repair => ProtoTraceOperationKind::Repair as i32,
2667        TraceOperationKind::Compact => ProtoTraceOperationKind::Compact as i32,
2668        TraceOperationKind::Delete => ProtoTraceOperationKind::Delete as i32,
2669        TraceOperationKind::Archive => ProtoTraceOperationKind::Archive as i32,
2670        TraceOperationKind::Suppress => ProtoTraceOperationKind::Suppress as i32,
2671        TraceOperationKind::Recover => ProtoTraceOperationKind::Recover as i32,
2672        TraceOperationKind::Export => ProtoTraceOperationKind::Export as i32,
2673        TraceOperationKind::Import => ProtoTraceOperationKind::Import as i32,
2674    }
2675}
2676
2677fn trace_operation_kind_from_proto(value: ProtoTraceOperationKind) -> Option<TraceOperationKind> {
2678    match value {
2679        ProtoTraceOperationKind::Upsert => Some(TraceOperationKind::Upsert),
2680        ProtoTraceOperationKind::BatchUpsert => Some(TraceOperationKind::BatchUpsert),
2681        ProtoTraceOperationKind::Recall => Some(TraceOperationKind::Recall),
2682        ProtoTraceOperationKind::Snapshot => Some(TraceOperationKind::Snapshot),
2683        ProtoTraceOperationKind::Stats => Some(TraceOperationKind::Stats),
2684        ProtoTraceOperationKind::IntegrityCheck => Some(TraceOperationKind::IntegrityCheck),
2685        ProtoTraceOperationKind::Repair => Some(TraceOperationKind::Repair),
2686        ProtoTraceOperationKind::Compact => Some(TraceOperationKind::Compact),
2687        ProtoTraceOperationKind::Delete => Some(TraceOperationKind::Delete),
2688        ProtoTraceOperationKind::Archive => Some(TraceOperationKind::Archive),
2689        ProtoTraceOperationKind::Suppress => Some(TraceOperationKind::Suppress),
2690        ProtoTraceOperationKind::Recover => Some(TraceOperationKind::Recover),
2691        ProtoTraceOperationKind::Export => Some(TraceOperationKind::Export),
2692        ProtoTraceOperationKind::Import => Some(TraceOperationKind::Import),
2693        ProtoTraceOperationKind::Unspecified => None,
2694    }
2695}
2696
2697fn trace_status_to_proto(value: TraceStatus) -> i32 {
2698    match value {
2699        TraceStatus::Ok => ProtoTraceStatus::Ok as i32,
2700        TraceStatus::Rejected => ProtoTraceStatus::Rejected as i32,
2701        TraceStatus::Error => ProtoTraceStatus::Error as i32,
2702    }
2703}
2704
2705fn trace_status_from_proto(value: ProtoTraceStatus) -> Option<TraceStatus> {
2706    match value {
2707        ProtoTraceStatus::Ok => Some(TraceStatus::Ok),
2708        ProtoTraceStatus::Rejected => Some(TraceStatus::Rejected),
2709        ProtoTraceStatus::Error => Some(TraceStatus::Error),
2710        ProtoTraceStatus::Unspecified => None,
2711    }
2712}
2713
2714fn operation_trace_to_proto(value: OperationTrace) -> ProtoOperationTrace {
2715    ProtoOperationTrace {
2716        trace_id: value.trace_id,
2717        correlation_id: value.correlation_id,
2718        operation: trace_operation_kind_to_proto(value.operation),
2719        transport: value.transport,
2720        backend: value.backend,
2721        admission_class: value.admission_class,
2722        tenant_id: value.tenant_id,
2723        namespace: value.namespace,
2724        principal: value.principal,
2725        store_span_id: value.store_span_id,
2726        planning_trace_id: value.planning_trace_id,
2727        started_at_unix_ms: value.started_at_unix_ms,
2728        completed_at_unix_ms: value.completed_at_unix_ms,
2729        latency_ms: value.latency_ms,
2730        status: trace_status_to_proto(value.status),
2731        status_message: value.status_message,
2732        summary: Some(ProtoOperationTraceSummary {
2733            record_id: value.summary.record_id,
2734            request_count: value.summary.request_count,
2735            query_text: value.summary.query_text,
2736            max_items: value.summary.max_items,
2737            token_budget: value.summary.token_budget,
2738            dry_run: value.summary.dry_run,
2739        }),
2740        recall_explanation: value.recall_explanation.map(recall_explanation_to_proto),
2741    }
2742}
2743
2744fn kinds_from_proto(values: Vec<String>) -> Result<Vec<MemoryRecordKind>, Status> {
2745    values
2746        .into_iter()
2747        .map(|value| record_kind_from_proto(&value))
2748        .collect()
2749}
2750
2751fn trust_levels_from_proto(values: Vec<String>) -> Result<Vec<MemoryTrustLevel>, Status> {
2752    values
2753        .into_iter()
2754        .map(|value| trust_level_from_proto(&value))
2755        .collect()
2756}
2757
2758fn quality_states_from_proto(values: Vec<String>) -> Result<Vec<MemoryQualityState>, Status> {
2759    values
2760        .into_iter()
2761        .map(|value| quality_state_from_proto(&value))
2762        .collect()
2763}
2764
2765fn recall_filters_from_proto(filters: Option<ProtoRecallFilters>) -> Result<RecallFilters, Status> {
2766    let Some(filters) = filters else {
2767        return Ok(RecallFilters::default());
2768    };
2769
2770    Ok(RecallFilters {
2771        kinds: kinds_from_proto(filters.kinds)?,
2772        required_labels: filters.required_labels,
2773        source: filters.source,
2774        from_unix_ms: filters.from_unix_ms,
2775        to_unix_ms: filters.to_unix_ms,
2776        min_importance_score: filters.min_importance_score,
2777        trust_levels: trust_levels_from_proto(filters.trust_levels)?,
2778        states: quality_states_from_proto(filters.states)?,
2779        include_archived: filters.include_archived,
2780        episode_id: filters.episode_id,
2781        continuity_states: filters
2782            .continuity_states
2783            .into_iter()
2784            .map(|value| continuity_state_from_proto(&value))
2785            .collect::<Result<Vec<_>, _>>()?,
2786        unresolved_only: filters.unresolved_only,
2787        temporal_order: temporal_order_from_proto(filters.temporal_order.as_deref().unwrap_or(""))?,
2788        historical_mode: historical_mode_from_proto(
2789            filters.historical_mode.as_deref().unwrap_or(""),
2790        )?,
2791        lineage_record_id: filters.lineage_record_id,
2792    })
2793}
2794
2795fn validate_scope_limits(scope: &MemoryScope, limits: &ServerLimits) -> Result<(), Status> {
2796    if scope.labels.len() > limits.max_labels_per_scope {
2797        return Err(invalid_argument(format!(
2798            "scope labels exceed configured max of {}",
2799            limits.max_labels_per_scope
2800        )));
2801    }
2802    Ok(())
2803}
2804
2805fn validate_record_limits(record: &MemoryRecord, limits: &ServerLimits) -> Result<(), Status> {
2806    validate_scope_limits(&record.scope, limits)?;
2807    if record.content.len() > limits.max_record_content_bytes {
2808        return Err(invalid_argument(format!(
2809            "record content exceeds configured max of {} bytes",
2810            limits.max_record_content_bytes
2811        )));
2812    }
2813    if record.summary.as_deref().map(str::len).unwrap_or(0) > limits.max_query_text_bytes {
2814        return Err(invalid_argument(format!(
2815            "record summary exceeds configured max of {} bytes",
2816            limits.max_query_text_bytes
2817        )));
2818    }
2819    Ok(())
2820}
2821
2822fn validate_recall_limits(query: &RecallQuery, limits: &ServerLimits) -> Result<(), Status> {
2823    validate_scope_limits(&query.scope, limits)?;
2824    if query.max_items == 0 {
2825        return Err(invalid_argument("recall max_items must be greater than 0"));
2826    }
2827    if query.max_items > limits.max_recall_items {
2828        return Err(invalid_argument(format!(
2829            "recall max_items exceeds configured max of {}",
2830            limits.max_recall_items
2831        )));
2832    }
2833    if query.query_text.len() > limits.max_query_text_bytes {
2834        return Err(invalid_argument(format!(
2835            "query text exceeds configured max of {} bytes",
2836            limits.max_query_text_bytes
2837        )));
2838    }
2839    Ok(())
2840}
2841
2842fn map_store_error(error: mnemara_core::Error) -> Status {
2843    match error {
2844        mnemara_core::Error::InvalidConfig(message)
2845        | mnemara_core::Error::InvalidRequest(message) => invalid_argument(message),
2846        mnemara_core::Error::Conflict(message) => Status::already_exists(message),
2847        mnemara_core::Error::Unsupported(message) => Status::unimplemented(message),
2848        mnemara_core::Error::Backend(message) => internal_status(message),
2849    }
2850}
2851
2852fn map_http_store_error(error: mnemara_core::Error) -> (StatusCode, Json<HttpErrorBody>) {
2853    let status = match error {
2854        mnemara_core::Error::InvalidConfig(_) | mnemara_core::Error::InvalidRequest(_) => {
2855            StatusCode::BAD_REQUEST
2856        }
2857        mnemara_core::Error::Conflict(_) => StatusCode::CONFLICT,
2858        mnemara_core::Error::Unsupported(_) => StatusCode::NOT_IMPLEMENTED,
2859        mnemara_core::Error::Backend(_) => StatusCode::INTERNAL_SERVER_ERROR,
2860    };
2861    (
2862        status,
2863        Json(HttpErrorBody {
2864            error: error.to_string(),
2865        }),
2866    )
2867}
2868
2869fn record_grpc_result<T>(
2870    metrics: &MethodMetrics,
2871    result: Result<Response<T>, Status>,
2872) -> Result<Response<T>, Status> {
2873    match result {
2874        Ok(response) => {
2875            metrics.record_status(&Status::ok(""));
2876            Ok(response)
2877        }
2878        Err(status) => {
2879            metrics.record_status(&status);
2880            Err(status)
2881        }
2882    }
2883}
2884
2885#[tonic::async_trait]
2886impl<S> MemoryService for GrpcMemoryService<S>
2887where
2888    S: MemoryStore + 'static,
2889{
2890    async fn upsert_memory_record(
2891        &self,
2892        request: Request<ProtoUpsertMemoryRecordRequest>,
2893    ) -> Result<Response<UpsertMemoryRecordReply>, Status> {
2894        self.runtime.metrics().grpc_upsert.record_started();
2895        validate_grpc_auth(
2896            &request,
2897            self.runtime.auth().as_ref(),
2898            AuthPermission::Write,
2899        )?;
2900        let principal = grpc_principal(&request);
2901        let started_at_unix_ms = now_unix_ms();
2902        let correlation_id = self.runtime.traces().next_id("corr");
2903        let result = async {
2904            let request = request.into_inner();
2905            let record = record_from_proto(
2906                request
2907                    .record
2908                    .ok_or_else(|| invalid_argument("memory record is required"))?,
2909            )?;
2910            let tenant_id = record.scope.tenant_id.clone();
2911            let namespace = record.scope.namespace.clone();
2912            validate_record_limits(&record, self.runtime.limits().as_ref())?;
2913            let _permit = self
2914                .runtime
2915                .admission()
2916                .acquire(AdmissionClass::Write, Some(&tenant_id))
2917                .await
2918                .map_err(|error| {
2919                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
2920                })?;
2921            let receipt = self
2922                .store
2923                .upsert(mnemara_core::UpsertRequest {
2924                    record,
2925                    idempotency_key: request.idempotency_key,
2926                })
2927                .await
2928                .map_err(map_store_error)?;
2929            record_trace(
2930                &self.runtime,
2931                TraceOperationKind::Upsert,
2932                "grpc",
2933                Some(tenant_id),
2934                Some(namespace),
2935                principal,
2936                started_at_unix_ms,
2937                TraceStatus::Ok,
2938                None,
2939                OperationTraceSummary {
2940                    record_id: Some(receipt.record_id.clone()),
2941                    ..OperationTraceSummary::default()
2942                },
2943                None,
2944                correlation_id,
2945            );
2946
2947            Ok(Response::new(UpsertMemoryRecordReply {
2948                record_id: receipt.record_id,
2949                deduplicated: receipt.deduplicated,
2950                summary_refreshed: receipt.summary_refreshed,
2951            }))
2952        }
2953        .await;
2954        record_grpc_result(&self.runtime.metrics().grpc_upsert, result)
2955    }
2956
2957    async fn batch_upsert_memory_records(
2958        &self,
2959        request: Request<BatchUpsertMemoryRecordsRequest>,
2960    ) -> Result<Response<BatchUpsertMemoryRecordsReply>, Status> {
2961        self.runtime.metrics().grpc_batch_upsert.record_started();
2962        validate_grpc_auth(
2963            &request,
2964            self.runtime.auth().as_ref(),
2965            AuthPermission::Write,
2966        )?;
2967        let principal = grpc_principal(&request);
2968        let started_at_unix_ms = now_unix_ms();
2969        let correlation_id = self.runtime.traces().next_id("corr");
2970        let result = async {
2971            let request = request.into_inner();
2972            if request.requests.len() > self.runtime.limits().max_batch_upsert_requests {
2973                return Err(invalid_argument(format!(
2974                    "batch upsert request count exceeds configured max of {}",
2975                    self.runtime.limits().max_batch_upsert_requests
2976                )));
2977            }
2978            let requests = request
2979                .requests
2980                .into_iter()
2981                .map(|request| {
2982                    let record = record_from_proto(
2983                        request
2984                            .record
2985                            .ok_or_else(|| invalid_argument("memory record is required"))?,
2986                    )?;
2987                    validate_record_limits(&record, self.runtime.limits().as_ref())?;
2988                    Ok(mnemara_core::UpsertRequest {
2989                        record,
2990                        idempotency_key: request.idempotency_key,
2991                    })
2992                })
2993                .collect::<Result<Vec<_>, Status>>()?;
2994            let tenant_id = requests
2995                .first()
2996                .map(|item| item.record.scope.tenant_id.clone());
2997            let namespace = requests
2998                .first()
2999                .map(|item| item.record.scope.namespace.clone());
3000            let _permit = self
3001                .runtime
3002                .admission()
3003                .acquire(AdmissionClass::Write, tenant_id.as_deref())
3004                .await
3005                .map_err(|error| {
3006                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3007                })?;
3008
3009            let receipts: Vec<UpsertMemoryRecordReply> = self
3010                .store
3011                .batch_upsert(BatchUpsertRequest { requests })
3012                .await
3013                .map_err(map_store_error)?
3014                .into_iter()
3015                .map(|receipt| UpsertMemoryRecordReply {
3016                    record_id: receipt.record_id,
3017                    deduplicated: receipt.deduplicated,
3018                    summary_refreshed: receipt.summary_refreshed,
3019                })
3020                .collect();
3021            record_trace(
3022                &self.runtime,
3023                TraceOperationKind::BatchUpsert,
3024                "grpc",
3025                tenant_id,
3026                namespace,
3027                principal,
3028                started_at_unix_ms,
3029                TraceStatus::Ok,
3030                None,
3031                OperationTraceSummary {
3032                    request_count: Some(receipts.len() as u32),
3033                    ..OperationTraceSummary::default()
3034                },
3035                None,
3036                correlation_id,
3037            );
3038
3039            Ok(Response::new(BatchUpsertMemoryRecordsReply { receipts }))
3040        }
3041        .await;
3042        record_grpc_result(&self.runtime.metrics().grpc_batch_upsert, result)
3043    }
3044
3045    async fn recall(
3046        &self,
3047        request: Request<ProtoRecallRequest>,
3048    ) -> Result<Response<RecallReply>, Status> {
3049        self.runtime.metrics().grpc_recall.record_started();
3050        validate_grpc_auth(&request, self.runtime.auth().as_ref(), AuthPermission::Read)?;
3051        let principal = grpc_principal(&request);
3052        let started_at_unix_ms = now_unix_ms();
3053        let correlation_id = self.runtime.traces().next_id("corr");
3054        let result = async {
3055            let request = request.into_inner();
3056            let query = RecallQuery {
3057                scope: scope_from_proto(
3058                    request
3059                        .scope
3060                        .ok_or_else(|| invalid_argument("recall scope is required"))?,
3061                )?,
3062                query_text: request.query_text,
3063                max_items: request.max_items as usize,
3064                token_budget: request.token_budget.map(|value| value as usize),
3065                filters: recall_filters_from_proto(request.filters)?,
3066                include_explanation: request.include_explanation,
3067            };
3068            validate_recall_limits(&query, self.runtime.limits().as_ref())?;
3069            let tenant_id = query.scope.tenant_id.clone();
3070            let namespace = query.scope.namespace.clone();
3071            let query_text = query.query_text.clone();
3072            let max_items = query.max_items as u32;
3073            let token_budget = query.token_budget.map(|value| value as u32);
3074            let _permit = self
3075                .runtime
3076                .admission()
3077                .acquire(AdmissionClass::Read, Some(&tenant_id))
3078                .await
3079                .map_err(|error| {
3080                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3081                })?;
3082            let mut result = self.store.recall(query).await.map_err(map_store_error)?;
3083            attach_correlation_id(&mut result, &correlation_id);
3084            let recall_explanation = result.explanation.clone();
3085
3086            let hits = result
3087                .hits
3088                .into_iter()
3089                .map(|hit| ProtoRecallHit {
3090                    record: Some(record_to_proto(hit.record)),
3091                    breakdown: Some(recall_score_breakdown_to_proto(hit.breakdown)),
3092                    selected_channels: hit
3093                        .explanation
3094                        .as_ref()
3095                        .map(|value| value.selected_channels.clone())
3096                        .unwrap_or_default(),
3097                    policy_notes: hit
3098                        .explanation
3099                        .as_ref()
3100                        .map(|value| value.policy_notes.clone())
3101                        .unwrap_or_default(),
3102                    explanation: hit.explanation.map(recall_explanation_to_proto),
3103                })
3104                .collect();
3105            record_trace(
3106                &self.runtime,
3107                TraceOperationKind::Recall,
3108                "grpc",
3109                Some(tenant_id),
3110                Some(namespace),
3111                principal,
3112                started_at_unix_ms,
3113                TraceStatus::Ok,
3114                None,
3115                OperationTraceSummary {
3116                    query_text: Some(query_text),
3117                    max_items: Some(max_items),
3118                    token_budget,
3119                    ..OperationTraceSummary::default()
3120                },
3121                recall_explanation,
3122                correlation_id,
3123            );
3124
3125            Ok(Response::new(RecallReply {
3126                hits,
3127                total_candidates_examined: result.total_candidates_examined as u32,
3128                explanation: result.explanation.map(recall_explanation_to_proto),
3129            }))
3130        }
3131        .await;
3132        record_grpc_result(&self.runtime.metrics().grpc_recall, result)
3133    }
3134
3135    async fn compact(
3136        &self,
3137        request: Request<ProtoCompactRequest>,
3138    ) -> Result<Response<CompactReply>, Status> {
3139        self.runtime.metrics().grpc_compact.record_started();
3140        validate_grpc_auth(
3141            &request,
3142            self.runtime.auth().as_ref(),
3143            AuthPermission::Admin,
3144        )?;
3145        let principal = grpc_principal(&request);
3146        let started_at_unix_ms = now_unix_ms();
3147        let correlation_id = self.runtime.traces().next_id("corr");
3148        let result = async {
3149            let request = request.into_inner();
3150            let tenant_id = request.tenant_id.clone();
3151            let namespace = request.namespace.clone();
3152            let _permit = self
3153                .runtime
3154                .admission()
3155                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3156                .await
3157                .map_err(|error| {
3158                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3159                })?;
3160            let report = self
3161                .store
3162                .compact(CompactionRequest {
3163                    tenant_id,
3164                    namespace: namespace.clone(),
3165                    dry_run: request.dry_run,
3166                    reason: request.reason,
3167                })
3168                .await
3169                .map_err(map_store_error)?;
3170            record_trace(
3171                &self.runtime,
3172                TraceOperationKind::Compact,
3173                "grpc",
3174                Some(request.tenant_id),
3175                namespace,
3176                principal,
3177                started_at_unix_ms,
3178                TraceStatus::Ok,
3179                None,
3180                OperationTraceSummary {
3181                    dry_run: Some(request.dry_run),
3182                    ..OperationTraceSummary::default()
3183                },
3184                None,
3185                correlation_id,
3186            );
3187
3188            Ok(Response::new(CompactReply {
3189                deduplicated_records: report.deduplicated_records,
3190                archived_records: report.archived_records,
3191                summarized_clusters: report.summarized_clusters,
3192                pruned_graph_edges: report.pruned_graph_edges,
3193                superseded_records: report.superseded_records,
3194                lineage_links_created: report.lineage_links_created,
3195                dry_run: report.dry_run,
3196            }))
3197        }
3198        .await;
3199        record_grpc_result(&self.runtime.metrics().grpc_compact, result)
3200    }
3201
3202    async fn snapshot(
3203        &self,
3204        request: Request<SnapshotRequest>,
3205    ) -> Result<Response<SnapshotReply>, Status> {
3206        self.runtime.metrics().grpc_snapshot.record_started();
3207        validate_grpc_auth(
3208            &request,
3209            self.runtime.auth().as_ref(),
3210            AuthPermission::Admin,
3211        )?;
3212        let principal = grpc_principal(&request);
3213        let started_at_unix_ms = now_unix_ms();
3214        let correlation_id = self.runtime.traces().next_id("corr");
3215        let result = async {
3216            let _permit = self
3217                .runtime
3218                .admission()
3219                .acquire(AdmissionClass::Admin, None)
3220                .await
3221                .map_err(|error| {
3222                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3223                })?;
3224            let manifest = self.store.snapshot().await.map_err(map_store_error)?;
3225            record_trace(
3226                &self.runtime,
3227                TraceOperationKind::Snapshot,
3228                "grpc",
3229                None,
3230                None,
3231                principal,
3232                started_at_unix_ms,
3233                TraceStatus::Ok,
3234                None,
3235                OperationTraceSummary::default(),
3236                None,
3237                correlation_id,
3238            );
3239            Ok(Response::new(SnapshotReply {
3240                snapshot_id: manifest.snapshot_id,
3241                created_at_unix_ms: manifest.created_at_unix_ms,
3242                namespaces: manifest.namespaces,
3243                record_count: manifest.record_count,
3244                storage_bytes: manifest.storage_bytes,
3245                engine: Some(engine_tuning_info_to_proto(manifest.engine)),
3246            }))
3247        }
3248        .await;
3249        record_grpc_result(&self.runtime.metrics().grpc_snapshot, result)
3250    }
3251
3252    async fn delete(
3253        &self,
3254        request: Request<ProtoDeleteRequest>,
3255    ) -> Result<Response<DeleteReply>, Status> {
3256        self.runtime.metrics().grpc_delete.record_started();
3257        validate_grpc_auth(
3258            &request,
3259            self.runtime.auth().as_ref(),
3260            AuthPermission::Admin,
3261        )?;
3262        let principal = grpc_principal(&request);
3263        let started_at_unix_ms = now_unix_ms();
3264        let correlation_id = self.runtime.traces().next_id("corr");
3265        let result = async {
3266            let request = request.into_inner();
3267            let tenant_id = request.tenant_id.clone();
3268            let namespace = request.namespace.clone();
3269            let _permit = self
3270                .runtime
3271                .admission()
3272                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3273                .await
3274                .map_err(|error| {
3275                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3276                })?;
3277            let receipt = self
3278                .store
3279                .delete(DeleteRequest {
3280                    tenant_id,
3281                    namespace: namespace.clone(),
3282                    record_id: request.record_id,
3283                    hard_delete: request.hard_delete,
3284                    audit_reason: request.audit_reason,
3285                })
3286                .await
3287                .map_err(map_store_error)?;
3288            record_trace(
3289                &self.runtime,
3290                TraceOperationKind::Delete,
3291                "grpc",
3292                Some(request.tenant_id),
3293                Some(namespace),
3294                principal,
3295                started_at_unix_ms,
3296                TraceStatus::Ok,
3297                None,
3298                OperationTraceSummary {
3299                    record_id: Some(receipt.record_id.clone()),
3300                    ..OperationTraceSummary::default()
3301                },
3302                None,
3303                correlation_id,
3304            );
3305
3306            Ok(Response::new(DeleteReply {
3307                record_id: receipt.record_id,
3308                tombstoned: receipt.tombstoned,
3309                hard_deleted: receipt.hard_deleted,
3310            }))
3311        }
3312        .await;
3313        record_grpc_result(&self.runtime.metrics().grpc_delete, result)
3314    }
3315
3316    async fn archive(
3317        &self,
3318        request: Request<ProtoArchiveRequest>,
3319    ) -> Result<Response<ArchiveReply>, Status> {
3320        self.runtime.metrics().grpc_archive.record_started();
3321        validate_grpc_auth(
3322            &request,
3323            self.runtime.auth().as_ref(),
3324            AuthPermission::Admin,
3325        )?;
3326        let principal = grpc_principal(&request);
3327        let started_at_unix_ms = now_unix_ms();
3328        let correlation_id = self.runtime.traces().next_id("corr");
3329        let result = async {
3330            let request = request.into_inner();
3331            let tenant_id = request.tenant_id.clone();
3332            let namespace = request.namespace.clone();
3333            let _permit = self
3334                .runtime
3335                .admission()
3336                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3337                .await
3338                .map_err(|error| {
3339                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3340                })?;
3341            let receipt = self
3342                .store
3343                .archive(ArchiveRequest {
3344                    tenant_id,
3345                    namespace: namespace.clone(),
3346                    record_id: request.record_id,
3347                    dry_run: request.dry_run,
3348                    audit_reason: request.audit_reason,
3349                })
3350                .await
3351                .map_err(map_store_error)?;
3352            record_trace(
3353                &self.runtime,
3354                TraceOperationKind::Archive,
3355                "grpc",
3356                Some(request.tenant_id),
3357                Some(namespace),
3358                principal,
3359                started_at_unix_ms,
3360                TraceStatus::Ok,
3361                None,
3362                OperationTraceSummary {
3363                    record_id: Some(receipt.record_id.clone()),
3364                    dry_run: Some(receipt.dry_run),
3365                    ..OperationTraceSummary::default()
3366                },
3367                None,
3368                correlation_id,
3369            );
3370
3371            Ok(Response::new(ArchiveReply {
3372                record_id: receipt.record_id,
3373                previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3374                previous_historical_state: historical_state_to_proto(
3375                    receipt.previous_historical_state,
3376                ),
3377                quality_state: quality_state_to_proto(receipt.quality_state),
3378                historical_state: historical_state_to_proto(receipt.historical_state),
3379                changed: receipt.changed,
3380                dry_run: receipt.dry_run,
3381            }))
3382        }
3383        .await;
3384        record_grpc_result(&self.runtime.metrics().grpc_archive, result)
3385    }
3386
3387    async fn suppress(
3388        &self,
3389        request: Request<ProtoSuppressRequest>,
3390    ) -> Result<Response<SuppressReply>, Status> {
3391        self.runtime.metrics().grpc_suppress.record_started();
3392        validate_grpc_auth(
3393            &request,
3394            self.runtime.auth().as_ref(),
3395            AuthPermission::Admin,
3396        )?;
3397        let principal = grpc_principal(&request);
3398        let started_at_unix_ms = now_unix_ms();
3399        let correlation_id = self.runtime.traces().next_id("corr");
3400        let result = async {
3401            let request = request.into_inner();
3402            let tenant_id = request.tenant_id.clone();
3403            let namespace = request.namespace.clone();
3404            let _permit = self
3405                .runtime
3406                .admission()
3407                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3408                .await
3409                .map_err(|error| {
3410                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3411                })?;
3412            let receipt = self
3413                .store
3414                .suppress(SuppressRequest {
3415                    tenant_id,
3416                    namespace: namespace.clone(),
3417                    record_id: request.record_id,
3418                    dry_run: request.dry_run,
3419                    audit_reason: request.audit_reason,
3420                })
3421                .await
3422                .map_err(map_store_error)?;
3423            record_trace(
3424                &self.runtime,
3425                TraceOperationKind::Suppress,
3426                "grpc",
3427                Some(request.tenant_id),
3428                Some(namespace),
3429                principal,
3430                started_at_unix_ms,
3431                TraceStatus::Ok,
3432                None,
3433                OperationTraceSummary {
3434                    record_id: Some(receipt.record_id.clone()),
3435                    dry_run: Some(receipt.dry_run),
3436                    ..OperationTraceSummary::default()
3437                },
3438                None,
3439                correlation_id,
3440            );
3441
3442            Ok(Response::new(SuppressReply {
3443                record_id: receipt.record_id,
3444                previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3445                previous_historical_state: historical_state_to_proto(
3446                    receipt.previous_historical_state,
3447                ),
3448                quality_state: quality_state_to_proto(receipt.quality_state),
3449                historical_state: historical_state_to_proto(receipt.historical_state),
3450                changed: receipt.changed,
3451                dry_run: receipt.dry_run,
3452            }))
3453        }
3454        .await;
3455        record_grpc_result(&self.runtime.metrics().grpc_suppress, result)
3456    }
3457
3458    async fn recover(
3459        &self,
3460        request: Request<ProtoRecoverRequest>,
3461    ) -> Result<Response<RecoverReply>, Status> {
3462        self.runtime.metrics().grpc_recover.record_started();
3463        validate_grpc_auth(
3464            &request,
3465            self.runtime.auth().as_ref(),
3466            AuthPermission::Admin,
3467        )?;
3468        let principal = grpc_principal(&request);
3469        let started_at_unix_ms = now_unix_ms();
3470        let correlation_id = self.runtime.traces().next_id("corr");
3471        let result = async {
3472            let request = request.into_inner();
3473            let tenant_id = request.tenant_id.clone();
3474            let namespace = request.namespace.clone();
3475            let _permit = self
3476                .runtime
3477                .admission()
3478                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3479                .await
3480                .map_err(|error| {
3481                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3482                })?;
3483            let receipt = self
3484                .store
3485                .recover(RecoverRequest {
3486                    tenant_id,
3487                    namespace: namespace.clone(),
3488                    record_id: request.record_id,
3489                    dry_run: request.dry_run,
3490                    audit_reason: request.audit_reason,
3491                    quality_state: quality_state_from_proto(&request.quality_state)?,
3492                    historical_state: request
3493                        .historical_state
3494                        .as_deref()
3495                        .map(historical_state_from_proto)
3496                        .transpose()?,
3497                })
3498                .await
3499                .map_err(map_store_error)?;
3500            record_trace(
3501                &self.runtime,
3502                TraceOperationKind::Recover,
3503                "grpc",
3504                Some(request.tenant_id),
3505                Some(namespace),
3506                principal,
3507                started_at_unix_ms,
3508                TraceStatus::Ok,
3509                None,
3510                OperationTraceSummary {
3511                    record_id: Some(receipt.record_id.clone()),
3512                    dry_run: Some(receipt.dry_run),
3513                    ..OperationTraceSummary::default()
3514                },
3515                None,
3516                correlation_id,
3517            );
3518
3519            Ok(Response::new(RecoverReply {
3520                record_id: receipt.record_id,
3521                previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3522                previous_historical_state: historical_state_to_proto(
3523                    receipt.previous_historical_state,
3524                ),
3525                quality_state: quality_state_to_proto(receipt.quality_state),
3526                historical_state: historical_state_to_proto(receipt.historical_state),
3527                changed: receipt.changed,
3528                dry_run: receipt.dry_run,
3529            }))
3530        }
3531        .await;
3532        record_grpc_result(&self.runtime.metrics().grpc_recover, result)
3533    }
3534
3535    async fn stats(
3536        &self,
3537        request: Request<ProtoStoreStatsRequest>,
3538    ) -> Result<Response<ProtoStoreStatsReply>, Status> {
3539        self.runtime.metrics().grpc_stats.record_started();
3540        validate_grpc_auth(
3541            &request,
3542            self.runtime.auth().as_ref(),
3543            AuthPermission::Admin,
3544        )?;
3545        let principal = grpc_principal(&request);
3546        let started_at_unix_ms = now_unix_ms();
3547        let correlation_id = self.runtime.traces().next_id("corr");
3548        let result = async {
3549            let request = request.into_inner();
3550            let tenant_id = request.tenant_id.clone();
3551            let namespace = request.namespace.clone();
3552            let _permit = self
3553                .runtime
3554                .admission()
3555                .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3556                .await
3557                .map_err(|error| {
3558                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3559                })?;
3560            let report = self
3561                .store
3562                .stats(StoreStatsRequest {
3563                    tenant_id: tenant_id.clone(),
3564                    namespace: namespace.clone(),
3565                })
3566                .await
3567                .map_err(map_store_error)?;
3568            record_trace(
3569                &self.runtime,
3570                TraceOperationKind::Stats,
3571                "grpc",
3572                tenant_id,
3573                namespace,
3574                principal,
3575                started_at_unix_ms,
3576                TraceStatus::Ok,
3577                None,
3578                OperationTraceSummary::default(),
3579                None,
3580                correlation_id,
3581            );
3582
3583            Ok(Response::new(ProtoStoreStatsReply {
3584                generated_at_unix_ms: report.generated_at_unix_ms,
3585                total_records: report.total_records,
3586                storage_bytes: report.storage_bytes,
3587                namespaces: report
3588                    .namespaces
3589                    .into_iter()
3590                    .map(namespace_stats_to_proto)
3591                    .collect(),
3592                maintenance: Some(maintenance_stats_to_proto(report.maintenance)),
3593                engine: Some(engine_tuning_info_to_proto(report.engine)),
3594            }))
3595        }
3596        .await;
3597        record_grpc_result(&self.runtime.metrics().grpc_stats, result)
3598    }
3599
3600    async fn integrity_check(
3601        &self,
3602        request: Request<ProtoIntegrityCheckRequest>,
3603    ) -> Result<Response<ProtoIntegrityCheckReply>, Status> {
3604        self.runtime.metrics().grpc_integrity.record_started();
3605        validate_grpc_auth(
3606            &request,
3607            self.runtime.auth().as_ref(),
3608            AuthPermission::Admin,
3609        )?;
3610        let principal = grpc_principal(&request);
3611        let started_at_unix_ms = now_unix_ms();
3612        let correlation_id = self.runtime.traces().next_id("corr");
3613        let result = async {
3614            let request = request.into_inner();
3615            let tenant_id = request.tenant_id.clone();
3616            let namespace = request.namespace.clone();
3617            let _permit = self
3618                .runtime
3619                .admission()
3620                .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3621                .await
3622                .map_err(|error| {
3623                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3624                })?;
3625            let report = self
3626                .store
3627                .integrity_check(IntegrityCheckRequest {
3628                    tenant_id: tenant_id.clone(),
3629                    namespace: namespace.clone(),
3630                })
3631                .await
3632                .map_err(map_store_error)?;
3633            record_trace(
3634                &self.runtime,
3635                TraceOperationKind::IntegrityCheck,
3636                "grpc",
3637                tenant_id,
3638                namespace,
3639                principal,
3640                started_at_unix_ms,
3641                TraceStatus::Ok,
3642                None,
3643                OperationTraceSummary::default(),
3644                None,
3645                correlation_id,
3646            );
3647
3648            Ok(Response::new(ProtoIntegrityCheckReply {
3649                generated_at_unix_ms: report.generated_at_unix_ms,
3650                healthy: report.healthy,
3651                scanned_records: report.scanned_records,
3652                scanned_idempotency_keys: report.scanned_idempotency_keys,
3653                stale_idempotency_keys: report.stale_idempotency_keys,
3654                missing_idempotency_keys: report.missing_idempotency_keys,
3655                duplicate_active_records: report.duplicate_active_records,
3656            }))
3657        }
3658        .await;
3659        record_grpc_result(&self.runtime.metrics().grpc_integrity, result)
3660    }
3661
3662    async fn repair(
3663        &self,
3664        request: Request<ProtoRepairRequest>,
3665    ) -> Result<Response<ProtoRepairReply>, Status> {
3666        self.runtime.metrics().grpc_repair.record_started();
3667        validate_grpc_auth(
3668            &request,
3669            self.runtime.auth().as_ref(),
3670            AuthPermission::Admin,
3671        )?;
3672        let principal = grpc_principal(&request);
3673        let started_at_unix_ms = now_unix_ms();
3674        let correlation_id = self.runtime.traces().next_id("corr");
3675        let result = async {
3676            let request = request.into_inner();
3677            let tenant_id = request.tenant_id.clone();
3678            let namespace = request.namespace.clone();
3679            let _permit = self
3680                .runtime
3681                .admission()
3682                .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3683                .await
3684                .map_err(|error| {
3685                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3686                })?;
3687            let report = self
3688                .store
3689                .repair(RepairRequest {
3690                    tenant_id: tenant_id.clone(),
3691                    namespace: namespace.clone(),
3692                    dry_run: request.dry_run,
3693                    reason: request.reason,
3694                    remove_stale_idempotency_keys: request.remove_stale_idempotency_keys,
3695                    rebuild_missing_idempotency_keys: request.rebuild_missing_idempotency_keys,
3696                })
3697                .await
3698                .map_err(map_store_error)?;
3699            record_trace(
3700                &self.runtime,
3701                TraceOperationKind::Repair,
3702                "grpc",
3703                tenant_id,
3704                namespace,
3705                principal,
3706                started_at_unix_ms,
3707                TraceStatus::Ok,
3708                None,
3709                OperationTraceSummary {
3710                    dry_run: Some(request.dry_run),
3711                    ..OperationTraceSummary::default()
3712                },
3713                None,
3714                correlation_id,
3715            );
3716
3717            Ok(Response::new(ProtoRepairReply {
3718                dry_run: report.dry_run,
3719                scanned_records: report.scanned_records,
3720                scanned_idempotency_keys: report.scanned_idempotency_keys,
3721                removed_stale_idempotency_keys: report.removed_stale_idempotency_keys,
3722                rebuilt_missing_idempotency_keys: report.rebuilt_missing_idempotency_keys,
3723                healthy_after: report.healthy_after,
3724            }))
3725        }
3726        .await;
3727        record_grpc_result(&self.runtime.metrics().grpc_repair, result)
3728    }
3729
3730    async fn list_traces(
3731        &self,
3732        request: Request<ProtoListTracesRequest>,
3733    ) -> Result<Response<ProtoListTracesReply>, Status> {
3734        validate_grpc_auth(
3735            &request,
3736            self.runtime.auth().as_ref(),
3737            AuthPermission::Admin,
3738        )?;
3739        let request = request.into_inner();
3740        self.runtime
3741            .metrics()
3742            .trace_reads
3743            .fetch_add(1, Ordering::Relaxed);
3744        let traces = self.runtime.traces().list(&TraceListRequest {
3745            tenant_id: request.tenant_id,
3746            namespace: request.namespace,
3747            operation: ProtoTraceOperationKind::try_from(request.operation)
3748                .ok()
3749                .and_then(trace_operation_kind_from_proto),
3750            status: ProtoTraceStatus::try_from(request.status)
3751                .ok()
3752                .and_then(trace_status_from_proto),
3753            before_started_at_unix_ms: request.before_started_at_unix_ms,
3754            limit: request.limit.map(|value| value as usize),
3755        });
3756        Ok(Response::new(ProtoListTracesReply {
3757            traces: traces.into_iter().map(operation_trace_to_proto).collect(),
3758        }))
3759    }
3760
3761    async fn get_trace(
3762        &self,
3763        request: Request<ProtoGetTraceRequest>,
3764    ) -> Result<Response<ProtoOperationTrace>, Status> {
3765        validate_grpc_auth(
3766            &request,
3767            self.runtime.auth().as_ref(),
3768            AuthPermission::Admin,
3769        )?;
3770        let trace_id = request.into_inner().trace_id;
3771        self.runtime
3772            .metrics()
3773            .trace_reads
3774            .fetch_add(1, Ordering::Relaxed);
3775        let trace = self
3776            .runtime
3777            .traces()
3778            .get(&trace_id)
3779            .ok_or_else(|| Status::not_found(format!("trace {trace_id} not found")))?;
3780        Ok(Response::new(operation_trace_to_proto(trace)))
3781    }
3782
3783    async fn export(
3784        &self,
3785        request: Request<ProtoExportRequest>,
3786    ) -> Result<Response<ProtoExportReply>, Status> {
3787        validate_grpc_auth(
3788            &request,
3789            self.runtime.auth().as_ref(),
3790            AuthPermission::Admin,
3791        )?;
3792        let principal = grpc_principal(&request);
3793        let started_at_unix_ms = now_unix_ms();
3794        let correlation_id = self.runtime.traces().next_id("corr");
3795        let request = request.into_inner();
3796        let export_request = ExportRequest {
3797            tenant_id: request.tenant_id,
3798            namespace: request.namespace,
3799            include_archived: request.include_archived,
3800        };
3801        let _permit = self
3802            .runtime
3803            .admission()
3804            .acquire(AdmissionClass::Admin, export_request.tenant_id.as_deref())
3805            .await
3806            .map_err(|error| map_grpc_admission_error(self.runtime.metrics().as_ref(), error))?;
3807        let package = self
3808            .store
3809            .export(export_request.clone())
3810            .await
3811            .map_err(map_store_error)?;
3812        record_trace(
3813            &self.runtime,
3814            TraceOperationKind::Export,
3815            "grpc",
3816            export_request.tenant_id,
3817            export_request.namespace,
3818            principal,
3819            started_at_unix_ms,
3820            TraceStatus::Ok,
3821            None,
3822            OperationTraceSummary::default(),
3823            None,
3824            correlation_id,
3825        );
3826        Ok(Response::new(portable_package_to_proto(package)))
3827    }
3828
3829    async fn import(
3830        &self,
3831        request: Request<ProtoImportRequest>,
3832    ) -> Result<Response<ProtoImportReply>, Status> {
3833        validate_grpc_auth(
3834            &request,
3835            self.runtime.auth().as_ref(),
3836            AuthPermission::Admin,
3837        )?;
3838        let principal = grpc_principal(&request);
3839        let started_at_unix_ms = now_unix_ms();
3840        let correlation_id = self.runtime.traces().next_id("corr");
3841        let request = request.into_inner();
3842        let import_request = ImportRequest {
3843            package: portable_package_from_proto(
3844                request
3845                    .package
3846                    .ok_or_else(|| invalid_argument("portable package is required"))?,
3847            )?,
3848            mode: import_mode_from_proto(request.mode)?,
3849            dry_run: request.dry_run,
3850        };
3851        let tenant_id = import_request
3852            .package
3853            .records
3854            .first()
3855            .map(|entry| entry.record.scope.tenant_id.clone());
3856        let namespace = import_request
3857            .package
3858            .records
3859            .first()
3860            .map(|entry| entry.record.scope.namespace.clone());
3861        let _permit = self
3862            .runtime
3863            .admission()
3864            .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3865            .await
3866            .map_err(|error| map_grpc_admission_error(self.runtime.metrics().as_ref(), error))?;
3867        let report = self
3868            .store
3869            .import(import_request.clone())
3870            .await
3871            .map_err(map_store_error)?;
3872        record_trace(
3873            &self.runtime,
3874            TraceOperationKind::Import,
3875            "grpc",
3876            tenant_id,
3877            namespace,
3878            principal,
3879            started_at_unix_ms,
3880            if report.failed_records.is_empty() {
3881                TraceStatus::Ok
3882            } else {
3883                TraceStatus::Error
3884            },
3885            (!report.failed_records.is_empty())
3886                .then(|| format!("{} import validation failures", report.failed_records.len())),
3887            OperationTraceSummary {
3888                request_count: Some(import_request.package.records.len() as u32),
3889                dry_run: Some(import_request.dry_run),
3890                ..OperationTraceSummary::default()
3891            },
3892            None,
3893            correlation_id,
3894        );
3895        Ok(Response::new(ProtoImportReply {
3896            mode: request.mode,
3897            dry_run: report.dry_run,
3898            applied: report.applied,
3899            compatible_package: report.compatible_package,
3900            package_version: report.package_version,
3901            validated_records: report.validated_records,
3902            imported_records: report.imported_records,
3903            skipped_records: report.skipped_records,
3904            replaced_existing: report.replaced_existing,
3905            snapshot_id: report.snapshot_id,
3906            failed_records: report
3907                .failed_records
3908                .into_iter()
3909                .map(|failure| mnemara_protocol::v1::ImportFailure {
3910                    record_id: failure.record_id,
3911                    reason: failure.reason,
3912                })
3913                .collect(),
3914        }))
3915    }
3916}