Skip to main content

mnemara_server/
lib.rs

1#![forbid(unsafe_code)]
2
3mod admission;
4mod observability;
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10use admission::{
11    AdmissionClass, AdmissionConfig, AdmissionController, AdmissionError, RuntimeAdmissionStatus,
12};
13use axum::extract::{Path, Query, Request as AxumRequest};
14use axum::http::HeaderMap;
15use axum::http::StatusCode;
16use axum::middleware::{self, Next};
17use axum::response::IntoResponse;
18use axum::routing::{get, post};
19use axum::{Json, Router};
20use mnemara_core::{
21    AffectiveAnnotation, AffectiveAnnotationProvenance, ArchiveReceipt, ArchiveRequest,
22    ArtifactPointer, BatchUpsertRequest, CompactionReport, CompactionRequest, ConflictAnnotation,
23    ConflictResolutionKind, ConflictReviewState, DeleteReceipt, DeleteRequest,
24    EPISODE_SCHEMA_VERSION, EmbeddingProviderKind, EngineTuningInfo, EpisodeContext,
25    EpisodeContinuityState, EpisodeSalience, ExportRequest, ImportMode, ImportReport,
26    ImportRequest, IntegrityCheckReport, IntegrityCheckRequest, LineageLink, LineageRelationKind,
27    MaintenanceStats, MemoryHistoricalState, MemoryQualityState, MemoryRecord, MemoryRecordKind,
28    MemoryScope, MemoryStore, MemoryTrustLevel, NamespaceStats, OperationTrace,
29    OperationTraceSummary, PortableRecord, PortableStorePackage, RecallCandidateSource,
30    RecallExplanation, RecallFilters, RecallHistoricalMode, RecallPlannerStage,
31    RecallPlanningProfile, RecallPlanningTrace, RecallPolicyProfile, RecallQuery, RecallResult,
32    RecallScorerKind, RecallScoringProfile, RecallTemporalOrder, RecallTraceCandidate,
33    RecoverReceipt, RecoverRequest, RepairReport, RepairRequest, SnapshotManifest,
34    StoreStatsReport, StoreStatsRequest, SuppressReceipt, SuppressRequest, TraceListRequest,
35    TraceOperationKind, TraceStatus, UpsertReceipt, UpsertRequest,
36};
37use mnemara_protocol::v1::memory_service_server::{MemoryService, MemoryServiceServer};
38use mnemara_protocol::v1::{
39    AffectiveAnnotation as ProtoAffectiveAnnotation, ArchiveReply,
40    ArchiveRequest as ProtoArchiveRequest, ArtifactPointer as ProtoArtifactPointer,
41    BatchUpsertMemoryRecordsReply, BatchUpsertMemoryRecordsRequest, CompactReply,
42    CompactRequest as ProtoCompactRequest, ConflictAnnotation as ProtoConflictAnnotation,
43    DeleteReply, DeleteRequest as ProtoDeleteRequest,
44    EmbeddingProviderKind as ProtoEmbeddingProviderKind, EngineTuningInfo as ProtoEngineTuningInfo,
45    EpisodeContext as ProtoEpisodeContext, EpisodeSalience as ProtoEpisodeSalience,
46    ExportReply as ProtoExportReply, ExportRequest as ProtoExportRequest,
47    GetTraceRequest as ProtoGetTraceRequest, ImportMode as ProtoImportMode,
48    ImportReply as ProtoImportReply, ImportRequest as ProtoImportRequest,
49    IntegrityCheckReply as ProtoIntegrityCheckReply,
50    IntegrityCheckRequest as ProtoIntegrityCheckRequest, LineageLink as ProtoLineageLink,
51    ListTracesReply as ProtoListTracesReply, ListTracesRequest as ProtoListTracesRequest,
52    MaintenanceStats as ProtoMaintenanceStats, MemoryRecord as ProtoMemoryRecord,
53    MemoryScope as ProtoMemoryScope, NamespaceStats as ProtoNamespaceStats,
54    OperationTrace as ProtoOperationTrace, OperationTraceSummary as ProtoOperationTraceSummary,
55    PortableRecord as ProtoPortableRecord, RecallExplanation as ProtoRecallExplanation,
56    RecallFilters as ProtoRecallFilters, RecallHit as ProtoRecallHit,
57    RecallPlanningTrace as ProtoRecallPlanningTrace,
58    RecallPolicyProfile as ProtoRecallPolicyProfile, RecallReply,
59    RecallRequest as ProtoRecallRequest, RecallScoreBreakdown as ProtoRecallScoreBreakdown,
60    RecallScorerKind as ProtoRecallScorerKind, RecallScoringProfile as ProtoRecallScoringProfile,
61    RecallTraceCandidate as ProtoRecallTraceCandidate, RecoverReply,
62    RecoverRequest as ProtoRecoverRequest, RepairReply as ProtoRepairReply,
63    RepairRequest as ProtoRepairRequest, SnapshotReply, SnapshotRequest,
64    StoreStatsReply as ProtoStoreStatsReply, StoreStatsRequest as ProtoStoreStatsRequest,
65    SuppressReply, SuppressRequest as ProtoSuppressRequest,
66    TraceOperationKind as ProtoTraceOperationKind, TraceStatus as ProtoTraceStatus,
67    UpsertMemoryRecordReply, UpsertMemoryRecordRequest as ProtoUpsertMemoryRecordRequest,
68};
69use observability::{TraceRegistry, TraceRegistrySnapshot, now_unix_ms};
70use tonic::{Request, Response, Status};
71
72#[derive(Debug, Clone, serde::Serialize)]
73struct HealthStatus {
74    status: &'static str,
75}
76
77#[derive(Debug, Clone, serde::Serialize)]
78struct ReadyStatus {
79    status: &'static str,
80    record_count: u64,
81    namespaces: Vec<String>,
82}
83
84#[derive(Debug, Clone, serde::Serialize)]
85struct ServerRuntimeStatus {
86    backend: String,
87    admission: RuntimeAdmissionStatus,
88    traces: TraceRegistrySnapshot,
89}
90
91#[derive(Debug, Clone, serde::Serialize)]
92struct HttpErrorBody {
93    error: String,
94}
95
96#[derive(Debug, Default)]
97struct MethodMetrics {
98    started: AtomicU64,
99    ok: AtomicU64,
100    invalid_argument: AtomicU64,
101    conflict: AtomicU64,
102    unimplemented: AtomicU64,
103    internal: AtomicU64,
104}
105
106impl MethodMetrics {
107    fn record_started(&self) {
108        self.started.fetch_add(1, Ordering::Relaxed);
109    }
110
111    fn record_status(&self, status: &Status) {
112        match status.code() {
113            tonic::Code::Ok => {
114                self.ok.fetch_add(1, Ordering::Relaxed);
115            }
116            tonic::Code::InvalidArgument => {
117                self.invalid_argument.fetch_add(1, Ordering::Relaxed);
118            }
119            tonic::Code::AlreadyExists => {
120                self.conflict.fetch_add(1, Ordering::Relaxed);
121            }
122            tonic::Code::Unimplemented => {
123                self.unimplemented.fetch_add(1, Ordering::Relaxed);
124            }
125            _ => {
126                self.internal.fetch_add(1, Ordering::Relaxed);
127            }
128        }
129    }
130}
131
132#[derive(Debug, Default)]
133pub struct ServerMetrics {
134    grpc_upsert: MethodMetrics,
135    grpc_batch_upsert: MethodMetrics,
136    grpc_recall: MethodMetrics,
137    grpc_compact: MethodMetrics,
138    grpc_snapshot: MethodMetrics,
139    grpc_delete: MethodMetrics,
140    grpc_archive: MethodMetrics,
141    grpc_suppress: MethodMetrics,
142    grpc_recover: MethodMetrics,
143    grpc_stats: MethodMetrics,
144    grpc_integrity: MethodMetrics,
145    grpc_repair: MethodMetrics,
146    http_healthz: AtomicU64,
147    http_readyz: AtomicU64,
148    http_snapshot: AtomicU64,
149    http_compact: AtomicU64,
150    http_delete: AtomicU64,
151    http_archive: AtomicU64,
152    http_suppress: AtomicU64,
153    http_recover: AtomicU64,
154    http_metrics: AtomicU64,
155    http_traces: AtomicU64,
156    http_runtime: AtomicU64,
157    http_export: AtomicU64,
158    http_import: AtomicU64,
159    http_rejected_body_too_large: AtomicU64,
160    admission_rejected: AtomicU64,
161    admission_timed_out: AtomicU64,
162    trace_records: AtomicU64,
163    trace_evictions: AtomicU64,
164    trace_reads: AtomicU64,
165}
166
167impl ServerMetrics {
168    pub fn render(&self) -> String {
169        let mut output = String::new();
170        append_counter(
171            &mut output,
172            "mnemara_grpc_upsert_requests_started_total",
173            self.grpc_upsert.started.load(Ordering::Relaxed),
174        );
175        append_counter(
176            &mut output,
177            "mnemara_grpc_upsert_requests_ok_total",
178            self.grpc_upsert.ok.load(Ordering::Relaxed),
179        );
180        append_counter(
181            &mut output,
182            "mnemara_grpc_upsert_requests_invalid_argument_total",
183            self.grpc_upsert.invalid_argument.load(Ordering::Relaxed),
184        );
185        append_counter(
186            &mut output,
187            "mnemara_grpc_upsert_requests_conflict_total",
188            self.grpc_upsert.conflict.load(Ordering::Relaxed),
189        );
190        append_counter(
191            &mut output,
192            "mnemara_grpc_upsert_requests_unimplemented_total",
193            self.grpc_upsert.unimplemented.load(Ordering::Relaxed),
194        );
195        append_counter(
196            &mut output,
197            "mnemara_grpc_upsert_requests_internal_total",
198            self.grpc_upsert.internal.load(Ordering::Relaxed),
199        );
200        append_counter(
201            &mut output,
202            "mnemara_grpc_batch_upsert_requests_started_total",
203            self.grpc_batch_upsert.started.load(Ordering::Relaxed),
204        );
205        append_counter(
206            &mut output,
207            "mnemara_grpc_batch_upsert_requests_ok_total",
208            self.grpc_batch_upsert.ok.load(Ordering::Relaxed),
209        );
210        append_counter(
211            &mut output,
212            "mnemara_grpc_batch_upsert_requests_invalid_argument_total",
213            self.grpc_batch_upsert
214                .invalid_argument
215                .load(Ordering::Relaxed),
216        );
217        append_counter(
218            &mut output,
219            "mnemara_grpc_recall_requests_started_total",
220            self.grpc_recall.started.load(Ordering::Relaxed),
221        );
222        append_counter(
223            &mut output,
224            "mnemara_grpc_recall_requests_ok_total",
225            self.grpc_recall.ok.load(Ordering::Relaxed),
226        );
227        append_counter(
228            &mut output,
229            "mnemara_grpc_recall_requests_invalid_argument_total",
230            self.grpc_recall.invalid_argument.load(Ordering::Relaxed),
231        );
232        append_counter(
233            &mut output,
234            "mnemara_grpc_compact_requests_started_total",
235            self.grpc_compact.started.load(Ordering::Relaxed),
236        );
237        append_counter(
238            &mut output,
239            "mnemara_grpc_compact_requests_ok_total",
240            self.grpc_compact.ok.load(Ordering::Relaxed),
241        );
242        append_counter(
243            &mut output,
244            "mnemara_grpc_snapshot_requests_started_total",
245            self.grpc_snapshot.started.load(Ordering::Relaxed),
246        );
247        append_counter(
248            &mut output,
249            "mnemara_grpc_snapshot_requests_ok_total",
250            self.grpc_snapshot.ok.load(Ordering::Relaxed),
251        );
252        append_counter(
253            &mut output,
254            "mnemara_grpc_delete_requests_started_total",
255            self.grpc_delete.started.load(Ordering::Relaxed),
256        );
257        append_counter(
258            &mut output,
259            "mnemara_grpc_delete_requests_ok_total",
260            self.grpc_delete.ok.load(Ordering::Relaxed),
261        );
262        append_counter(
263            &mut output,
264            "mnemara_grpc_archive_requests_started_total",
265            self.grpc_archive.started.load(Ordering::Relaxed),
266        );
267        append_counter(
268            &mut output,
269            "mnemara_grpc_archive_requests_ok_total",
270            self.grpc_archive.ok.load(Ordering::Relaxed),
271        );
272        append_counter(
273            &mut output,
274            "mnemara_grpc_suppress_requests_started_total",
275            self.grpc_suppress.started.load(Ordering::Relaxed),
276        );
277        append_counter(
278            &mut output,
279            "mnemara_grpc_suppress_requests_ok_total",
280            self.grpc_suppress.ok.load(Ordering::Relaxed),
281        );
282        append_counter(
283            &mut output,
284            "mnemara_grpc_recover_requests_started_total",
285            self.grpc_recover.started.load(Ordering::Relaxed),
286        );
287        append_counter(
288            &mut output,
289            "mnemara_grpc_recover_requests_ok_total",
290            self.grpc_recover.ok.load(Ordering::Relaxed),
291        );
292        append_counter(
293            &mut output,
294            "mnemara_http_healthz_requests_total",
295            self.http_healthz.load(Ordering::Relaxed),
296        );
297        append_counter(
298            &mut output,
299            "mnemara_http_readyz_requests_total",
300            self.http_readyz.load(Ordering::Relaxed),
301        );
302        append_counter(
303            &mut output,
304            "mnemara_http_snapshot_requests_total",
305            self.http_snapshot.load(Ordering::Relaxed),
306        );
307        append_counter(
308            &mut output,
309            "mnemara_http_compact_requests_total",
310            self.http_compact.load(Ordering::Relaxed),
311        );
312        append_counter(
313            &mut output,
314            "mnemara_http_delete_requests_total",
315            self.http_delete.load(Ordering::Relaxed),
316        );
317        append_counter(
318            &mut output,
319            "mnemara_http_archive_requests_total",
320            self.http_archive.load(Ordering::Relaxed),
321        );
322        append_counter(
323            &mut output,
324            "mnemara_http_suppress_requests_total",
325            self.http_suppress.load(Ordering::Relaxed),
326        );
327        append_counter(
328            &mut output,
329            "mnemara_http_recover_requests_total",
330            self.http_recover.load(Ordering::Relaxed),
331        );
332        append_counter(
333            &mut output,
334            "mnemara_http_metrics_requests_total",
335            self.http_metrics.load(Ordering::Relaxed),
336        );
337        append_counter(
338            &mut output,
339            "mnemara_http_traces_requests_total",
340            self.http_traces.load(Ordering::Relaxed),
341        );
342        append_counter(
343            &mut output,
344            "mnemara_http_runtime_requests_total",
345            self.http_runtime.load(Ordering::Relaxed),
346        );
347        append_counter(
348            &mut output,
349            "mnemara_http_export_requests_total",
350            self.http_export.load(Ordering::Relaxed),
351        );
352        append_counter(
353            &mut output,
354            "mnemara_http_import_requests_total",
355            self.http_import.load(Ordering::Relaxed),
356        );
357        append_counter(
358            &mut output,
359            "mnemara_http_rejected_body_too_large_total",
360            self.http_rejected_body_too_large.load(Ordering::Relaxed),
361        );
362        append_counter(
363            &mut output,
364            "mnemara_admission_rejected_total",
365            self.admission_rejected.load(Ordering::Relaxed),
366        );
367        append_counter(
368            &mut output,
369            "mnemara_admission_timed_out_total",
370            self.admission_timed_out.load(Ordering::Relaxed),
371        );
372        append_counter(
373            &mut output,
374            "mnemara_trace_records_total",
375            self.trace_records.load(Ordering::Relaxed),
376        );
377        append_counter(
378            &mut output,
379            "mnemara_trace_evictions_total",
380            self.trace_evictions.load(Ordering::Relaxed),
381        );
382        append_counter(
383            &mut output,
384            "mnemara_trace_reads_total",
385            self.trace_reads.load(Ordering::Relaxed),
386        );
387        output
388    }
389}
390
391fn append_counter(output: &mut String, name: &str, value: u64) {
392    output.push_str("# TYPE ");
393    output.push_str(name);
394    output.push_str(" counter\n");
395    output.push_str(name);
396    output.push(' ');
397    output.push_str(&value.to_string());
398    output.push('\n');
399}
400
401#[derive(Debug, Clone)]
402pub struct ServerLimits {
403    pub max_http_body_bytes: usize,
404    pub max_batch_upsert_requests: usize,
405    pub max_recall_items: usize,
406    pub max_query_text_bytes: usize,
407    pub max_record_content_bytes: usize,
408    pub max_labels_per_scope: usize,
409    pub max_inflight_reads: usize,
410    pub max_inflight_writes: usize,
411    pub max_inflight_admin: usize,
412    pub max_queued_requests: usize,
413    pub max_tenant_inflight: usize,
414    pub queue_wait_timeout_ms: u64,
415    pub trace_retention: usize,
416}
417
418impl Default for ServerLimits {
419    fn default() -> Self {
420        Self {
421            max_http_body_bytes: 64 * 1024,
422            max_batch_upsert_requests: 128,
423            max_recall_items: 64,
424            max_query_text_bytes: 4 * 1024,
425            max_record_content_bytes: 32 * 1024,
426            max_labels_per_scope: 32,
427            max_inflight_reads: 64,
428            max_inflight_writes: 32,
429            max_inflight_admin: 8,
430            max_queued_requests: 256,
431            max_tenant_inflight: 16,
432            queue_wait_timeout_ms: 2_000,
433            trace_retention: 256,
434        }
435    }
436}
437
438#[derive(Debug, Clone, Default)]
439pub struct AuthConfig {
440    pub bearer_token: Option<String>,
441    pub protect_metrics: bool,
442    pub token_policies: Vec<TokenPolicy>,
443}
444
445#[derive(Debug, Clone, Copy, PartialEq, Eq)]
446pub enum AuthPermission {
447    Read,
448    Write,
449    Admin,
450    Metrics,
451}
452
453#[derive(Debug, Clone, PartialEq, Eq)]
454pub struct TokenPolicy {
455    pub token: String,
456    pub permissions: Vec<AuthPermission>,
457}
458
459impl AuthConfig {
460    fn has_authentication(&self) -> bool {
461        self.bearer_token.is_some() || !self.token_policies.is_empty()
462    }
463
464    fn authorize(&self, authorization_header: Option<&str>, permission: AuthPermission) -> bool {
465        if !self.has_authentication() {
466            return true;
467        }
468
469        let Some(token) = bearer_token_from_header(authorization_header) else {
470            return false;
471        };
472
473        if self.bearer_token.as_deref() == Some(token) {
474            return true;
475        }
476
477        self.token_policies
478            .iter()
479            .any(|policy| policy.token == token && policy.permissions.contains(&permission))
480    }
481}
482
483#[derive(Debug, Clone, serde::Deserialize)]
484pub struct DeleteHttpRequest {
485    pub tenant_id: String,
486    pub namespace: String,
487    pub record_id: String,
488    pub hard_delete: bool,
489    pub audit_reason: String,
490}
491
492#[derive(Debug, Clone, serde::Deserialize)]
493pub struct ArchiveHttpRequest {
494    pub tenant_id: String,
495    pub namespace: String,
496    pub record_id: String,
497    pub dry_run: bool,
498    pub audit_reason: String,
499}
500
501#[derive(Debug, Clone, serde::Deserialize)]
502pub struct SuppressHttpRequest {
503    pub tenant_id: String,
504    pub namespace: String,
505    pub record_id: String,
506    pub dry_run: bool,
507    pub audit_reason: String,
508}
509
510#[derive(Debug, Clone, serde::Deserialize)]
511pub struct RecoverHttpRequest {
512    pub tenant_id: String,
513    pub namespace: String,
514    pub record_id: String,
515    pub dry_run: bool,
516    pub audit_reason: String,
517    pub quality_state: MemoryQualityState,
518    pub historical_state: Option<MemoryHistoricalState>,
519}
520
521pub struct GrpcMemoryService<S> {
522    store: Arc<S>,
523    runtime: ServerRuntime,
524}
525
526impl<S> Clone for GrpcMemoryService<S> {
527    fn clone(&self) -> Self {
528        Self {
529            store: Arc::clone(&self.store),
530            runtime: self.runtime.clone(),
531        }
532    }
533}
534
535#[derive(Debug)]
536struct ServerRuntimeInner {
537    backend: Arc<String>,
538    limits: Arc<ServerLimits>,
539    metrics: Arc<ServerMetrics>,
540    auth: Arc<AuthConfig>,
541    traces: Arc<TraceRegistry>,
542    admission: Arc<AdmissionController>,
543}
544
545#[derive(Debug, Clone)]
546pub struct ServerRuntime {
547    inner: Arc<ServerRuntimeInner>,
548}
549
550impl ServerRuntime {
551    pub fn new(
552        backend: impl Into<String>,
553        limits: ServerLimits,
554        metrics: Arc<ServerMetrics>,
555        auth: AuthConfig,
556    ) -> Self {
557        let admission = AdmissionController::new(AdmissionConfig {
558            max_inflight_reads: limits.max_inflight_reads,
559            max_inflight_writes: limits.max_inflight_writes,
560            max_inflight_admin: limits.max_inflight_admin,
561            max_queued_requests: limits.max_queued_requests,
562            max_tenant_inflight: limits.max_tenant_inflight,
563            queue_wait_timeout_ms: limits.queue_wait_timeout_ms,
564        });
565        Self {
566            inner: Arc::new(ServerRuntimeInner {
567                backend: Arc::new(backend.into()),
568                traces: Arc::new(TraceRegistry::new(limits.trace_retention)),
569                admission: Arc::new(admission),
570                auth: Arc::new(auth),
571                metrics,
572                limits: Arc::new(limits),
573            }),
574        }
575    }
576
577    fn limits(&self) -> &Arc<ServerLimits> {
578        &self.inner.limits
579    }
580
581    fn metrics(&self) -> &Arc<ServerMetrics> {
582        &self.inner.metrics
583    }
584
585    fn backend(&self) -> &Arc<String> {
586        &self.inner.backend
587    }
588
589    fn auth(&self) -> &Arc<AuthConfig> {
590        &self.inner.auth
591    }
592
593    fn traces(&self) -> &Arc<TraceRegistry> {
594        &self.inner.traces
595    }
596
597    fn admission(&self) -> &Arc<AdmissionController> {
598        &self.inner.admission
599    }
600}
601
602impl<S: MemoryStore> GrpcMemoryService<S> {
603    pub fn new(store: Arc<S>) -> Self {
604        let backend = store.as_ref().backend_kind().to_string();
605        Self::with_runtime(
606            store,
607            ServerRuntime::new(
608                backend,
609                ServerLimits::default(),
610                Arc::new(ServerMetrics::default()),
611                AuthConfig::default(),
612            ),
613        )
614    }
615
616    pub fn with_limits(store: Arc<S>, limits: ServerLimits) -> Self {
617        let backend = store.as_ref().backend_kind().to_string();
618        Self::with_runtime(
619            store,
620            ServerRuntime::new(
621                backend,
622                limits,
623                Arc::new(ServerMetrics::default()),
624                AuthConfig::default(),
625            ),
626        )
627    }
628
629    pub fn with_observability(
630        store: Arc<S>,
631        limits: ServerLimits,
632        metrics: Arc<ServerMetrics>,
633    ) -> Self {
634        let backend = store.as_ref().backend_kind().to_string();
635        Self::with_runtime(
636            store,
637            ServerRuntime::new(backend, limits, metrics, AuthConfig::default()),
638        )
639    }
640
641    pub fn with_runtime_config(
642        store: Arc<S>,
643        limits: ServerLimits,
644        metrics: Arc<ServerMetrics>,
645        auth: AuthConfig,
646    ) -> Self {
647        let backend = store.as_ref().backend_kind().to_string();
648        Self::with_runtime(store, ServerRuntime::new(backend, limits, metrics, auth))
649    }
650
651    pub fn with_runtime(store: Arc<S>, runtime: ServerRuntime) -> Self {
652        Self { store, runtime }
653    }
654
655    pub fn into_service(self) -> MemoryServiceServer<Self>
656    where
657        Self: MemoryService,
658    {
659        MemoryServiceServer::new(self)
660    }
661}
662
663pub async fn serve<S>(
664    addr: SocketAddr,
665    store: Arc<S>,
666    limits: ServerLimits,
667    auth: AuthConfig,
668) -> Result<(), tonic::transport::Error>
669where
670    S: MemoryStore + 'static,
671{
672    let backend = store.as_ref().backend_kind().to_string();
673    serve_with_runtime(
674        addr,
675        store,
676        ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
677    )
678    .await
679}
680
681pub async fn serve_with_runtime<S>(
682    addr: SocketAddr,
683    store: Arc<S>,
684    runtime: ServerRuntime,
685) -> Result<(), tonic::transport::Error>
686where
687    S: MemoryStore + 'static,
688{
689    tonic::transport::Server::builder()
690        .add_service(GrpcMemoryService::with_runtime(store, runtime).into_service())
691        .serve(addr)
692        .await
693}
694
695pub fn http_app<S>(store: Arc<S>, limits: ServerLimits, auth: AuthConfig) -> Router
696where
697    S: MemoryStore + 'static,
698{
699    let backend = store.as_ref().backend_kind().to_string();
700    http_app_with_runtime(
701        store,
702        ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
703    )
704}
705
706pub fn http_app_with_metrics<S>(
707    store: Arc<S>,
708    limits: ServerLimits,
709    metrics: Arc<ServerMetrics>,
710    auth: AuthConfig,
711) -> Router
712where
713    S: MemoryStore + 'static,
714{
715    let backend = store.as_ref().backend_kind().to_string();
716    http_app_with_runtime(store, ServerRuntime::new(backend, limits, metrics, auth))
717}
718
719pub fn http_app_with_runtime<S>(store: Arc<S>, runtime: ServerRuntime) -> Router
720where
721    S: MemoryStore + 'static,
722{
723    let limits = Arc::clone(runtime.limits());
724    let ready_store = Arc::clone(&store);
725    let upsert_store = Arc::clone(&store);
726    let batch_upsert_store = Arc::clone(&store);
727    let recall_store = Arc::clone(&store);
728    let snapshot_store = Arc::clone(&store);
729    let stats_store = Arc::clone(&store);
730    let integrity_store = Arc::clone(&store);
731    let repair_store = Arc::clone(&store);
732    let compact_store = Arc::clone(&store);
733    let delete_store = Arc::clone(&store);
734    let archive_store = Arc::clone(&store);
735    let suppress_store = Arc::clone(&store);
736    let recover_store = Arc::clone(&store);
737    let export_store = Arc::clone(&store);
738    let import_store = Arc::clone(&store);
739    let ready_runtime = runtime.clone();
740    let upsert_runtime = runtime.clone();
741    let batch_upsert_runtime = runtime.clone();
742    let recall_runtime = runtime.clone();
743    let snapshot_runtime = runtime.clone();
744    let stats_runtime = runtime.clone();
745    let integrity_runtime = runtime.clone();
746    let repair_runtime = runtime.clone();
747    let compact_runtime = runtime.clone();
748    let delete_runtime = runtime.clone();
749    let archive_runtime = runtime.clone();
750    let suppress_runtime = runtime.clone();
751    let recover_runtime = runtime.clone();
752    let traces_runtime = runtime.clone();
753    let trace_runtime = runtime.clone();
754    let export_runtime = runtime.clone();
755    let import_runtime = runtime.clone();
756    let runtime_status_runtime = runtime.clone();
757    let metrics_metrics = Arc::clone(runtime.metrics());
758    let middleware_limits = Arc::clone(&limits);
759    let middleware_metrics = Arc::clone(runtime.metrics());
760    let middleware_auth = Arc::clone(runtime.auth());
761
762    let health_metrics = Arc::clone(runtime.metrics());
763
764    Router::new()
765        .route(
766            "/healthz",
767            get(move || {
768                let metrics = Arc::clone(&health_metrics);
769                async move { healthz(metrics).await }
770            }),
771        )
772        .route(
773            "/readyz",
774            get(move || {
775                let store = Arc::clone(&ready_store);
776                let runtime = ready_runtime.clone();
777                async move { readyz(store, runtime).await }
778            }),
779        )
780        .route(
781            "/memory/upsert",
782            post(
783                move |headers: HeaderMap, Json(request): Json<UpsertRequest>| {
784                    let store = Arc::clone(&upsert_store);
785                    let runtime = upsert_runtime.clone();
786                    async move { upsert_http(store, request, headers, runtime).await }
787                },
788            ),
789        )
790        .route(
791            "/memory/batch-upsert",
792            post(
793                move |headers: HeaderMap, Json(request): Json<BatchUpsertRequest>| {
794                    let store = Arc::clone(&batch_upsert_store);
795                    let runtime = batch_upsert_runtime.clone();
796                    async move { batch_upsert_http(store, request, headers, runtime).await }
797                },
798            ),
799        )
800        .route(
801            "/memory/recall",
802            post(
803                move |headers: HeaderMap, Json(request): Json<RecallQuery>| {
804                    let store = Arc::clone(&recall_store);
805                    let runtime = recall_runtime.clone();
806                    async move { recall_http(store, request, headers, runtime).await }
807                },
808            ),
809        )
810        .route(
811            "/admin/snapshot",
812            get(move |headers: HeaderMap| {
813                let store = Arc::clone(&snapshot_store);
814                let runtime = snapshot_runtime.clone();
815                async move { snapshot_http(store, headers, runtime).await }
816            }),
817        )
818        .route(
819            "/admin/stats",
820            get(
821                move |headers: HeaderMap, Query(request): Query<StoreStatsRequest>| {
822                    let store = Arc::clone(&stats_store);
823                    let runtime = stats_runtime.clone();
824                    async move { stats_http(store, request, headers, runtime).await }
825                },
826            ),
827        )
828        .route(
829            "/admin/integrity",
830            get(
831                move |headers: HeaderMap, Query(request): Query<IntegrityCheckRequest>| {
832                    let store = Arc::clone(&integrity_store);
833                    let runtime = integrity_runtime.clone();
834                    async move { integrity_http(store, request, headers, runtime).await }
835                },
836            ),
837        )
838        .route(
839            "/admin/repair",
840            post(
841                move |headers: HeaderMap, Json(request): Json<RepairRequest>| {
842                    let store = Arc::clone(&repair_store);
843                    let runtime = repair_runtime.clone();
844                    async move { repair_http(store, request, headers, runtime).await }
845                },
846            ),
847        )
848        .route(
849            "/admin/compact",
850            post(
851                move |headers: HeaderMap, Json(request): Json<CompactionRequest>| {
852                    let store = Arc::clone(&compact_store);
853                    let runtime = compact_runtime.clone();
854                    async move { compact_http(store, request, headers, runtime).await }
855                },
856            ),
857        )
858        .route(
859            "/admin/delete",
860            post(
861                move |headers: HeaderMap, Json(request): Json<DeleteHttpRequest>| {
862                    let store = Arc::clone(&delete_store);
863                    let runtime = delete_runtime.clone();
864                    async move { delete_http(store, request, headers, runtime).await }
865                },
866            ),
867        )
868        .route(
869            "/admin/archive",
870            post(
871                move |headers: HeaderMap, Json(request): Json<ArchiveHttpRequest>| {
872                    let store = Arc::clone(&archive_store);
873                    let runtime = archive_runtime.clone();
874                    async move { archive_http(store, request, headers, runtime).await }
875                },
876            ),
877        )
878        .route(
879            "/admin/suppress",
880            post(
881                move |headers: HeaderMap, Json(request): Json<SuppressHttpRequest>| {
882                    let store = Arc::clone(&suppress_store);
883                    let runtime = suppress_runtime.clone();
884                    async move { suppress_http(store, request, headers, runtime).await }
885                },
886            ),
887        )
888        .route(
889            "/admin/recover",
890            post(
891                move |headers: HeaderMap, Json(request): Json<RecoverHttpRequest>| {
892                    let store = Arc::clone(&recover_store);
893                    let runtime = recover_runtime.clone();
894                    async move { recover_http(store, request, headers, runtime).await }
895                },
896            ),
897        )
898        .route(
899            "/admin/traces",
900            get(move |Query(request): Query<TraceListRequest>| {
901                let runtime = traces_runtime.clone();
902                async move { traces_http(request, runtime).await }
903            }),
904        )
905        .route(
906            "/admin/traces/{trace_id}",
907            get(move |Path(trace_id): Path<String>| {
908                let runtime = trace_runtime.clone();
909                async move { trace_http(trace_id, runtime).await }
910            }),
911        )
912        .route(
913            "/admin/runtime",
914            get(move || {
915                let runtime = runtime_status_runtime.clone();
916                async move { runtime_status_http(runtime).await }
917            }),
918        )
919        .route(
920            "/admin/export",
921            post(
922                move |headers: HeaderMap, Json(request): Json<ExportRequest>| {
923                    let store = Arc::clone(&export_store);
924                    let runtime = export_runtime.clone();
925                    async move { export_http(store, request, headers, runtime).await }
926                },
927            ),
928        )
929        .route(
930            "/admin/import",
931            post(
932                move |headers: HeaderMap, Json(request): Json<ImportRequest>| {
933                    let store = Arc::clone(&import_store);
934                    let runtime = import_runtime.clone();
935                    async move { import_http(store, request, headers, runtime).await }
936                },
937            ),
938        )
939        .route(
940            "/metrics",
941            get(move || {
942                let metrics = Arc::clone(&metrics_metrics);
943                async move { metrics_http(metrics).await }
944            }),
945        )
946        .layer(middleware::from_fn(move |request, next| {
947            let limits = Arc::clone(&middleware_limits);
948            let metrics = Arc::clone(&middleware_metrics);
949            let auth = Arc::clone(&middleware_auth);
950            async move { enforce_http_guardrails(request, next, limits, metrics, auth).await }
951        }))
952}
953
954pub async fn serve_http<S>(
955    addr: SocketAddr,
956    store: Arc<S>,
957    limits: ServerLimits,
958    auth: AuthConfig,
959) -> std::io::Result<()>
960where
961    S: MemoryStore + 'static,
962{
963    let backend = store.as_ref().backend_kind().to_string();
964    serve_http_with_runtime(
965        addr,
966        store,
967        ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
968    )
969    .await
970}
971
972pub async fn serve_http_with_runtime<S>(
973    addr: SocketAddr,
974    store: Arc<S>,
975    runtime: ServerRuntime,
976) -> std::io::Result<()>
977where
978    S: MemoryStore + 'static,
979{
980    let listener = tokio::net::TcpListener::bind(addr).await?;
981    axum::serve(listener, http_app_with_runtime(store, runtime)).await
982}
983
984async fn healthz(metrics: Arc<ServerMetrics>) -> Json<HealthStatus> {
985    metrics.http_healthz.fetch_add(1, Ordering::Relaxed);
986    Json(HealthStatus { status: "ok" })
987}
988
989async fn readyz<S>(
990    store: Arc<S>,
991    runtime: ServerRuntime,
992) -> Result<Json<ReadyStatus>, (StatusCode, Json<HttpErrorBody>)>
993where
994    S: MemoryStore + 'static,
995{
996    runtime
997        .metrics()
998        .http_readyz
999        .fetch_add(1, Ordering::Relaxed);
1000    let manifest = store.snapshot().await.map_err(map_http_store_error)?;
1001    Ok(Json(ReadyStatus {
1002        status: "ready",
1003        record_count: manifest.record_count,
1004        namespaces: manifest.namespaces,
1005    }))
1006}
1007
1008async fn upsert_http<S>(
1009    store: Arc<S>,
1010    request: UpsertRequest,
1011    headers: HeaderMap,
1012    runtime: ServerRuntime,
1013) -> Result<Json<UpsertReceipt>, (StatusCode, Json<HttpErrorBody>)>
1014where
1015    S: MemoryStore + 'static,
1016{
1017    let started_at_unix_ms = now_unix_ms();
1018    let correlation_id = runtime.traces().next_id("corr");
1019    validate_record_limits(&request.record, runtime.limits().as_ref())
1020        .map_err(map_http_validation_error)?;
1021    let _permit = runtime
1022        .admission()
1023        .acquire(AdmissionClass::Write, Some(&request.record.scope.tenant_id))
1024        .await
1025        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1026    let receipt = store
1027        .upsert(request.clone())
1028        .await
1029        .map_err(map_http_store_error)?;
1030    record_trace(
1031        &runtime,
1032        TraceOperationKind::Upsert,
1033        "http",
1034        Some(request.record.scope.tenant_id),
1035        Some(request.record.scope.namespace),
1036        http_principal(&headers),
1037        started_at_unix_ms,
1038        TraceStatus::Ok,
1039        None,
1040        OperationTraceSummary {
1041            record_id: Some(request.record.id),
1042            ..OperationTraceSummary::default()
1043        },
1044        None,
1045        correlation_id,
1046    );
1047    Ok(Json(receipt))
1048}
1049
1050async fn batch_upsert_http<S>(
1051    store: Arc<S>,
1052    request: BatchUpsertRequest,
1053    headers: HeaderMap,
1054    runtime: ServerRuntime,
1055) -> Result<Json<Vec<UpsertReceipt>>, (StatusCode, Json<HttpErrorBody>)>
1056where
1057    S: MemoryStore + 'static,
1058{
1059    let started_at_unix_ms = now_unix_ms();
1060    let correlation_id = runtime.traces().next_id("corr");
1061    let tenant_id = request
1062        .requests
1063        .first()
1064        .map(|item| item.record.scope.tenant_id.clone());
1065    let namespace = request
1066        .requests
1067        .first()
1068        .map(|item| item.record.scope.namespace.clone());
1069    if request.requests.len() > runtime.limits().max_batch_upsert_requests {
1070        return Err((
1071            StatusCode::BAD_REQUEST,
1072            Json(HttpErrorBody {
1073                error: format!(
1074                    "batch upsert request count exceeds configured max of {}",
1075                    runtime.limits().max_batch_upsert_requests
1076                ),
1077            }),
1078        ));
1079    }
1080    for item in &request.requests {
1081        validate_record_limits(&item.record, runtime.limits().as_ref())
1082            .map_err(map_http_validation_error)?;
1083    }
1084    let _permit = runtime
1085        .admission()
1086        .acquire(AdmissionClass::Write, tenant_id.as_deref())
1087        .await
1088        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1089    let receipts = store
1090        .batch_upsert(request.clone())
1091        .await
1092        .map_err(map_http_store_error)?;
1093    record_trace(
1094        &runtime,
1095        TraceOperationKind::BatchUpsert,
1096        "http",
1097        tenant_id,
1098        namespace,
1099        http_principal(&headers),
1100        started_at_unix_ms,
1101        TraceStatus::Ok,
1102        None,
1103        OperationTraceSummary {
1104            request_count: Some(request.requests.len() as u32),
1105            ..OperationTraceSummary::default()
1106        },
1107        None,
1108        correlation_id,
1109    );
1110    Ok(Json(receipts))
1111}
1112
1113async fn recall_http<S>(
1114    store: Arc<S>,
1115    request: RecallQuery,
1116    headers: HeaderMap,
1117    runtime: ServerRuntime,
1118) -> Result<Json<RecallResult>, (StatusCode, Json<HttpErrorBody>)>
1119where
1120    S: MemoryStore + 'static,
1121{
1122    let started_at_unix_ms = now_unix_ms();
1123    let correlation_id = runtime.traces().next_id("corr");
1124    validate_recall_limits(&request, runtime.limits().as_ref())
1125        .map_err(map_http_validation_error)?;
1126    let _permit = runtime
1127        .admission()
1128        .acquire(AdmissionClass::Read, Some(&request.scope.tenant_id))
1129        .await
1130        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1131    let mut result = store
1132        .recall(request.clone())
1133        .await
1134        .map_err(map_http_store_error)?;
1135    attach_correlation_id(&mut result, &correlation_id);
1136    record_trace(
1137        &runtime,
1138        TraceOperationKind::Recall,
1139        "http",
1140        Some(request.scope.tenant_id),
1141        Some(request.scope.namespace),
1142        http_principal(&headers),
1143        started_at_unix_ms,
1144        TraceStatus::Ok,
1145        None,
1146        OperationTraceSummary {
1147            query_text: Some(request.query_text),
1148            max_items: Some(request.max_items as u32),
1149            token_budget: request.token_budget.map(|value| value as u32),
1150            ..OperationTraceSummary::default()
1151        },
1152        result.explanation.clone(),
1153        correlation_id,
1154    );
1155    Ok(Json(result))
1156}
1157
1158async fn snapshot_http<S>(
1159    store: Arc<S>,
1160    headers: HeaderMap,
1161    runtime: ServerRuntime,
1162) -> Result<Json<SnapshotManifest>, (StatusCode, Json<HttpErrorBody>)>
1163where
1164    S: MemoryStore + 'static,
1165{
1166    runtime
1167        .metrics()
1168        .http_snapshot
1169        .fetch_add(1, Ordering::Relaxed);
1170    let started_at_unix_ms = now_unix_ms();
1171    let correlation_id = runtime.traces().next_id("corr");
1172    let _permit = runtime
1173        .admission()
1174        .acquire(AdmissionClass::Admin, None)
1175        .await
1176        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1177    let manifest = store.snapshot().await.map_err(map_http_store_error)?;
1178    record_trace(
1179        &runtime,
1180        TraceOperationKind::Snapshot,
1181        "http",
1182        None,
1183        None,
1184        http_principal(&headers),
1185        started_at_unix_ms,
1186        TraceStatus::Ok,
1187        None,
1188        OperationTraceSummary::default(),
1189        None,
1190        correlation_id,
1191    );
1192    Ok(Json(manifest))
1193}
1194
1195async fn stats_http<S>(
1196    store: Arc<S>,
1197    request: StoreStatsRequest,
1198    headers: HeaderMap,
1199    runtime: ServerRuntime,
1200) -> Result<Json<StoreStatsReport>, (StatusCode, Json<HttpErrorBody>)>
1201where
1202    S: MemoryStore + 'static,
1203{
1204    let started_at_unix_ms = now_unix_ms();
1205    let correlation_id = runtime.traces().next_id("corr");
1206    let _permit = runtime
1207        .admission()
1208        .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1209        .await
1210        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1211    let report = store
1212        .stats(request.clone())
1213        .await
1214        .map_err(map_http_store_error)?;
1215    record_trace(
1216        &runtime,
1217        TraceOperationKind::Stats,
1218        "http",
1219        request.tenant_id,
1220        request.namespace,
1221        http_principal(&headers),
1222        started_at_unix_ms,
1223        TraceStatus::Ok,
1224        None,
1225        OperationTraceSummary::default(),
1226        None,
1227        correlation_id,
1228    );
1229    Ok(Json(report))
1230}
1231
1232async fn integrity_http<S>(
1233    store: Arc<S>,
1234    request: IntegrityCheckRequest,
1235    headers: HeaderMap,
1236    runtime: ServerRuntime,
1237) -> Result<Json<IntegrityCheckReport>, (StatusCode, Json<HttpErrorBody>)>
1238where
1239    S: MemoryStore + 'static,
1240{
1241    let started_at_unix_ms = now_unix_ms();
1242    let correlation_id = runtime.traces().next_id("corr");
1243    let _permit = runtime
1244        .admission()
1245        .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1246        .await
1247        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1248    let report = store
1249        .integrity_check(request.clone())
1250        .await
1251        .map_err(map_http_store_error)?;
1252    record_trace(
1253        &runtime,
1254        TraceOperationKind::IntegrityCheck,
1255        "http",
1256        request.tenant_id,
1257        request.namespace,
1258        http_principal(&headers),
1259        started_at_unix_ms,
1260        TraceStatus::Ok,
1261        None,
1262        OperationTraceSummary::default(),
1263        None,
1264        correlation_id,
1265    );
1266    Ok(Json(report))
1267}
1268
1269async fn repair_http<S>(
1270    store: Arc<S>,
1271    request: RepairRequest,
1272    headers: HeaderMap,
1273    runtime: ServerRuntime,
1274) -> Result<Json<RepairReport>, (StatusCode, Json<HttpErrorBody>)>
1275where
1276    S: MemoryStore + 'static,
1277{
1278    let started_at_unix_ms = now_unix_ms();
1279    let correlation_id = runtime.traces().next_id("corr");
1280    let _permit = runtime
1281        .admission()
1282        .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1283        .await
1284        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1285    let report = store
1286        .repair(request.clone())
1287        .await
1288        .map_err(map_http_store_error)?;
1289    record_trace(
1290        &runtime,
1291        TraceOperationKind::Repair,
1292        "http",
1293        request.tenant_id,
1294        request.namespace,
1295        http_principal(&headers),
1296        started_at_unix_ms,
1297        TraceStatus::Ok,
1298        None,
1299        OperationTraceSummary {
1300            dry_run: Some(request.dry_run),
1301            ..OperationTraceSummary::default()
1302        },
1303        None,
1304        correlation_id,
1305    );
1306    Ok(Json(report))
1307}
1308
1309async fn compact_http<S>(
1310    store: Arc<S>,
1311    request: CompactionRequest,
1312    headers: HeaderMap,
1313    runtime: ServerRuntime,
1314) -> Result<Json<CompactionReport>, (StatusCode, Json<HttpErrorBody>)>
1315where
1316    S: MemoryStore + 'static,
1317{
1318    runtime
1319        .metrics()
1320        .http_compact
1321        .fetch_add(1, Ordering::Relaxed);
1322    let started_at_unix_ms = now_unix_ms();
1323    let correlation_id = runtime.traces().next_id("corr");
1324    let _permit = runtime
1325        .admission()
1326        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1327        .await
1328        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1329    let report = store
1330        .compact(request.clone())
1331        .await
1332        .map_err(map_http_store_error)?;
1333    record_trace(
1334        &runtime,
1335        TraceOperationKind::Compact,
1336        "http",
1337        Some(request.tenant_id),
1338        request.namespace,
1339        http_principal(&headers),
1340        started_at_unix_ms,
1341        TraceStatus::Ok,
1342        None,
1343        OperationTraceSummary {
1344            dry_run: Some(request.dry_run),
1345            ..OperationTraceSummary::default()
1346        },
1347        None,
1348        correlation_id,
1349    );
1350    Ok(Json(report))
1351}
1352
1353async fn delete_http<S>(
1354    store: Arc<S>,
1355    request: DeleteHttpRequest,
1356    headers: HeaderMap,
1357    runtime: ServerRuntime,
1358) -> Result<Json<DeleteReceipt>, (StatusCode, Json<HttpErrorBody>)>
1359where
1360    S: MemoryStore + 'static,
1361{
1362    runtime
1363        .metrics()
1364        .http_delete
1365        .fetch_add(1, Ordering::Relaxed);
1366    let started_at_unix_ms = now_unix_ms();
1367    let correlation_id = runtime.traces().next_id("corr");
1368    let _permit = runtime
1369        .admission()
1370        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1371        .await
1372        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1373    let receipt = store
1374        .delete(DeleteRequest {
1375            tenant_id: request.tenant_id.clone(),
1376            namespace: request.namespace.clone(),
1377            record_id: request.record_id,
1378            hard_delete: request.hard_delete,
1379            audit_reason: request.audit_reason,
1380        })
1381        .await
1382        .map_err(map_http_store_error)?;
1383    record_trace(
1384        &runtime,
1385        TraceOperationKind::Delete,
1386        "http",
1387        Some(request.tenant_id),
1388        Some(request.namespace),
1389        http_principal(&headers),
1390        started_at_unix_ms,
1391        TraceStatus::Ok,
1392        None,
1393        OperationTraceSummary {
1394            record_id: Some(receipt.record_id.clone()),
1395            ..OperationTraceSummary::default()
1396        },
1397        None,
1398        correlation_id,
1399    );
1400    Ok(Json(receipt))
1401}
1402
1403async fn archive_http<S>(
1404    store: Arc<S>,
1405    request: ArchiveHttpRequest,
1406    headers: HeaderMap,
1407    runtime: ServerRuntime,
1408) -> Result<Json<ArchiveReceipt>, (StatusCode, Json<HttpErrorBody>)>
1409where
1410    S: MemoryStore + 'static,
1411{
1412    runtime
1413        .metrics()
1414        .http_archive
1415        .fetch_add(1, Ordering::Relaxed);
1416    let started_at_unix_ms = now_unix_ms();
1417    let correlation_id = runtime.traces().next_id("corr");
1418    let _permit = runtime
1419        .admission()
1420        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1421        .await
1422        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1423    let receipt = store
1424        .archive(ArchiveRequest {
1425            tenant_id: request.tenant_id.clone(),
1426            namespace: request.namespace.clone(),
1427            record_id: request.record_id,
1428            dry_run: request.dry_run,
1429            audit_reason: request.audit_reason,
1430        })
1431        .await
1432        .map_err(map_http_store_error)?;
1433    record_trace(
1434        &runtime,
1435        TraceOperationKind::Archive,
1436        "http",
1437        Some(request.tenant_id),
1438        Some(request.namespace),
1439        http_principal(&headers),
1440        started_at_unix_ms,
1441        TraceStatus::Ok,
1442        None,
1443        OperationTraceSummary {
1444            record_id: Some(receipt.record_id.clone()),
1445            dry_run: Some(receipt.dry_run),
1446            ..OperationTraceSummary::default()
1447        },
1448        None,
1449        correlation_id,
1450    );
1451    Ok(Json(receipt))
1452}
1453
1454async fn suppress_http<S>(
1455    store: Arc<S>,
1456    request: SuppressHttpRequest,
1457    headers: HeaderMap,
1458    runtime: ServerRuntime,
1459) -> Result<Json<SuppressReceipt>, (StatusCode, Json<HttpErrorBody>)>
1460where
1461    S: MemoryStore + 'static,
1462{
1463    runtime
1464        .metrics()
1465        .http_suppress
1466        .fetch_add(1, Ordering::Relaxed);
1467    let started_at_unix_ms = now_unix_ms();
1468    let correlation_id = runtime.traces().next_id("corr");
1469    let _permit = runtime
1470        .admission()
1471        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1472        .await
1473        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1474    let receipt = store
1475        .suppress(SuppressRequest {
1476            tenant_id: request.tenant_id.clone(),
1477            namespace: request.namespace.clone(),
1478            record_id: request.record_id,
1479            dry_run: request.dry_run,
1480            audit_reason: request.audit_reason,
1481        })
1482        .await
1483        .map_err(map_http_store_error)?;
1484    record_trace(
1485        &runtime,
1486        TraceOperationKind::Suppress,
1487        "http",
1488        Some(request.tenant_id),
1489        Some(request.namespace),
1490        http_principal(&headers),
1491        started_at_unix_ms,
1492        TraceStatus::Ok,
1493        None,
1494        OperationTraceSummary {
1495            record_id: Some(receipt.record_id.clone()),
1496            dry_run: Some(receipt.dry_run),
1497            ..OperationTraceSummary::default()
1498        },
1499        None,
1500        correlation_id,
1501    );
1502    Ok(Json(receipt))
1503}
1504
1505async fn recover_http<S>(
1506    store: Arc<S>,
1507    request: RecoverHttpRequest,
1508    headers: HeaderMap,
1509    runtime: ServerRuntime,
1510) -> Result<Json<RecoverReceipt>, (StatusCode, Json<HttpErrorBody>)>
1511where
1512    S: MemoryStore + 'static,
1513{
1514    runtime
1515        .metrics()
1516        .http_recover
1517        .fetch_add(1, Ordering::Relaxed);
1518    let started_at_unix_ms = now_unix_ms();
1519    let correlation_id = runtime.traces().next_id("corr");
1520    let _permit = runtime
1521        .admission()
1522        .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1523        .await
1524        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1525    let receipt = store
1526        .recover(RecoverRequest {
1527            tenant_id: request.tenant_id.clone(),
1528            namespace: request.namespace.clone(),
1529            record_id: request.record_id,
1530            dry_run: request.dry_run,
1531            audit_reason: request.audit_reason,
1532            quality_state: request.quality_state,
1533            historical_state: request.historical_state,
1534        })
1535        .await
1536        .map_err(map_http_store_error)?;
1537    record_trace(
1538        &runtime,
1539        TraceOperationKind::Recover,
1540        "http",
1541        Some(request.tenant_id),
1542        Some(request.namespace),
1543        http_principal(&headers),
1544        started_at_unix_ms,
1545        TraceStatus::Ok,
1546        None,
1547        OperationTraceSummary {
1548            record_id: Some(receipt.record_id.clone()),
1549            dry_run: Some(receipt.dry_run),
1550            ..OperationTraceSummary::default()
1551        },
1552        None,
1553        correlation_id,
1554    );
1555    Ok(Json(receipt))
1556}
1557
1558async fn traces_http(
1559    request: TraceListRequest,
1560    runtime: ServerRuntime,
1561) -> Result<Json<Vec<OperationTrace>>, (StatusCode, Json<HttpErrorBody>)> {
1562    runtime
1563        .metrics()
1564        .http_traces
1565        .fetch_add(1, Ordering::Relaxed);
1566    runtime
1567        .metrics()
1568        .trace_reads
1569        .fetch_add(1, Ordering::Relaxed);
1570    Ok(Json(runtime.traces().list(&request)))
1571}
1572
1573async fn trace_http(
1574    trace_id: String,
1575    runtime: ServerRuntime,
1576) -> Result<Json<OperationTrace>, (StatusCode, Json<HttpErrorBody>)> {
1577    runtime
1578        .metrics()
1579        .http_traces
1580        .fetch_add(1, Ordering::Relaxed);
1581    runtime
1582        .metrics()
1583        .trace_reads
1584        .fetch_add(1, Ordering::Relaxed);
1585    runtime.traces().get(&trace_id).map(Json).ok_or_else(|| {
1586        (
1587            StatusCode::NOT_FOUND,
1588            Json(HttpErrorBody {
1589                error: format!("trace {trace_id} not found"),
1590            }),
1591        )
1592    })
1593}
1594
1595async fn runtime_status_http(runtime: ServerRuntime) -> Json<ServerRuntimeStatus> {
1596    runtime
1597        .metrics()
1598        .http_runtime
1599        .fetch_add(1, Ordering::Relaxed);
1600    Json(ServerRuntimeStatus {
1601        backend: runtime.backend().as_ref().clone(),
1602        admission: runtime.admission().snapshot(),
1603        traces: runtime.traces().snapshot(),
1604    })
1605}
1606
1607async fn export_http<S>(
1608    store: Arc<S>,
1609    request: ExportRequest,
1610    headers: HeaderMap,
1611    runtime: ServerRuntime,
1612) -> Result<Json<PortableStorePackage>, (StatusCode, Json<HttpErrorBody>)>
1613where
1614    S: MemoryStore + 'static,
1615{
1616    runtime
1617        .metrics()
1618        .http_export
1619        .fetch_add(1, Ordering::Relaxed);
1620    let started_at_unix_ms = now_unix_ms();
1621    let correlation_id = runtime.traces().next_id("corr");
1622    let _permit = runtime
1623        .admission()
1624        .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1625        .await
1626        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1627    let package = store
1628        .export(request.clone())
1629        .await
1630        .map_err(map_http_store_error)?;
1631    record_trace(
1632        &runtime,
1633        TraceOperationKind::Export,
1634        "http",
1635        request.tenant_id,
1636        request.namespace,
1637        http_principal(&headers),
1638        started_at_unix_ms,
1639        TraceStatus::Ok,
1640        None,
1641        OperationTraceSummary::default(),
1642        None,
1643        correlation_id,
1644    );
1645    Ok(Json(package))
1646}
1647
1648async fn import_http<S>(
1649    store: Arc<S>,
1650    request: ImportRequest,
1651    headers: HeaderMap,
1652    runtime: ServerRuntime,
1653) -> Result<Json<ImportReport>, (StatusCode, Json<HttpErrorBody>)>
1654where
1655    S: MemoryStore + 'static,
1656{
1657    runtime
1658        .metrics()
1659        .http_import
1660        .fetch_add(1, Ordering::Relaxed);
1661    let started_at_unix_ms = now_unix_ms();
1662    let correlation_id = runtime.traces().next_id("corr");
1663    let tenant_id = request
1664        .package
1665        .records
1666        .first()
1667        .map(|entry| entry.record.scope.tenant_id.clone());
1668    let namespace = request
1669        .package
1670        .records
1671        .first()
1672        .map(|entry| entry.record.scope.namespace.clone());
1673    let _permit = runtime
1674        .admission()
1675        .acquire(AdmissionClass::Admin, tenant_id.as_deref())
1676        .await
1677        .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1678    let report = store
1679        .import(request.clone())
1680        .await
1681        .map_err(map_http_store_error)?;
1682    record_trace(
1683        &runtime,
1684        TraceOperationKind::Import,
1685        "http",
1686        tenant_id,
1687        namespace,
1688        http_principal(&headers),
1689        started_at_unix_ms,
1690        if report.failed_records.is_empty() {
1691            TraceStatus::Ok
1692        } else {
1693            TraceStatus::Error
1694        },
1695        (!report.failed_records.is_empty())
1696            .then(|| format!("{} import validation failures", report.failed_records.len())),
1697        OperationTraceSummary {
1698            request_count: Some(request.package.records.len() as u32),
1699            dry_run: Some(request.dry_run),
1700            ..OperationTraceSummary::default()
1701        },
1702        None,
1703        correlation_id,
1704    );
1705    Ok(Json(report))
1706}
1707
1708async fn metrics_http(metrics: Arc<ServerMetrics>) -> impl IntoResponse {
1709    metrics.http_metrics.fetch_add(1, Ordering::Relaxed);
1710    (
1711        [(
1712            axum::http::header::CONTENT_TYPE,
1713            "text/plain; version=0.0.4",
1714        )],
1715        metrics.render(),
1716    )
1717}
1718
1719async fn enforce_http_guardrails(
1720    request: AxumRequest,
1721    next: Next,
1722    limits: Arc<ServerLimits>,
1723    metrics: Arc<ServerMetrics>,
1724    auth: Arc<AuthConfig>,
1725) -> Result<axum::response::Response, StatusCode> {
1726    let path = request.uri().path().to_string();
1727    if let Some(content_length) = request.headers().get(axum::http::header::CONTENT_LENGTH)
1728        && let Ok(content_length) = content_length.to_str()
1729        && let Ok(content_length) = content_length.parse::<usize>()
1730        && content_length > limits.max_http_body_bytes
1731    {
1732        metrics
1733            .http_rejected_body_too_large
1734            .fetch_add(1, Ordering::Relaxed);
1735        return Err(StatusCode::PAYLOAD_TOO_LARGE);
1736    }
1737    validate_http_auth(&request, &path, auth.as_ref())?;
1738    Ok(next.run(request).await)
1739}
1740
1741fn http_principal(headers: &HeaderMap) -> Option<String> {
1742    headers
1743        .get(axum::http::header::AUTHORIZATION)
1744        .and_then(|value| value.to_str().ok())
1745        .map(|_| "bearer".to_string())
1746}
1747
1748fn grpc_principal<T>(request: &Request<T>) -> Option<String> {
1749    request
1750        .metadata()
1751        .get("authorization")
1752        .and_then(|value| value.to_str().ok())
1753        .map(|_| "bearer".to_string())
1754}
1755
1756fn map_http_admission_error(
1757    metrics: &ServerMetrics,
1758    error: AdmissionError,
1759) -> (StatusCode, Json<HttpErrorBody>) {
1760    match error {
1761        AdmissionError::QueueFull | AdmissionError::TenantBusy => {
1762            metrics.admission_rejected.fetch_add(1, Ordering::Relaxed);
1763            (
1764                StatusCode::TOO_MANY_REQUESTS,
1765                Json(HttpErrorBody {
1766                    error: "request admission rejected".to_string(),
1767                }),
1768            )
1769        }
1770        AdmissionError::TimedOut => {
1771            metrics.admission_timed_out.fetch_add(1, Ordering::Relaxed);
1772            (
1773                StatusCode::TOO_MANY_REQUESTS,
1774                Json(HttpErrorBody {
1775                    error: "request admission timed out".to_string(),
1776                }),
1777            )
1778        }
1779    }
1780}
1781
1782fn map_grpc_admission_error(metrics: &ServerMetrics, error: AdmissionError) -> Status {
1783    match error {
1784        AdmissionError::QueueFull | AdmissionError::TenantBusy => {
1785            metrics.admission_rejected.fetch_add(1, Ordering::Relaxed);
1786            Status::resource_exhausted("request admission rejected")
1787        }
1788        AdmissionError::TimedOut => {
1789            metrics.admission_timed_out.fetch_add(1, Ordering::Relaxed);
1790            Status::resource_exhausted("request admission timed out")
1791        }
1792    }
1793}
1794
1795fn attach_correlation_id(result: &mut RecallResult, correlation_id: &str) {
1796    if let Some(explanation) = result.explanation.as_mut() {
1797        if let Some(existing) = explanation.trace_id.replace(correlation_id.to_string()) {
1798            explanation
1799                .policy_notes
1800                .push(format!("planning_trace_id={existing}"));
1801        }
1802        explanation
1803            .policy_notes
1804            .push(format!("correlation_id={correlation_id}"));
1805    }
1806    for hit in &mut result.hits {
1807        if let Some(explanation) = hit.explanation.as_mut() {
1808            if let Some(existing) = explanation.trace_id.replace(correlation_id.to_string()) {
1809                explanation
1810                    .policy_notes
1811                    .push(format!("planning_trace_id={existing}"));
1812            }
1813            explanation
1814                .policy_notes
1815                .push(format!("correlation_id={correlation_id}"));
1816        }
1817    }
1818}
1819
1820#[allow(clippy::too_many_arguments)]
1821fn record_trace(
1822    runtime: &ServerRuntime,
1823    operation: TraceOperationKind,
1824    transport: &str,
1825    tenant_id: Option<String>,
1826    namespace: Option<String>,
1827    principal: Option<String>,
1828    started_at_unix_ms: u64,
1829    status: TraceStatus,
1830    status_message: Option<String>,
1831    summary: OperationTraceSummary,
1832    recall_explanation: Option<RecallExplanation>,
1833    correlation_id: String,
1834) {
1835    let completed_at_unix_ms = now_unix_ms();
1836    let admission_class = admission_class_for_operation(operation.clone()).to_string();
1837    let planning_trace_id = recall_explanation.as_ref().and_then(|value| {
1838        value
1839            .planning_trace
1840            .as_ref()
1841            .map(|trace| trace.trace_id.clone())
1842    });
1843    let store_span_id = runtime.traces().next_id("store-span");
1844    let evicted = runtime.traces().record(OperationTrace {
1845        trace_id: runtime.traces().next_id("trace"),
1846        correlation_id,
1847        operation,
1848        transport: transport.to_string(),
1849        backend: Some(runtime.backend().as_ref().clone()),
1850        admission_class: Some(admission_class),
1851        tenant_id,
1852        namespace,
1853        principal,
1854        store_span_id: Some(store_span_id),
1855        planning_trace_id,
1856        started_at_unix_ms,
1857        completed_at_unix_ms,
1858        latency_ms: completed_at_unix_ms.saturating_sub(started_at_unix_ms),
1859        status,
1860        status_message,
1861        summary,
1862        recall_explanation,
1863    });
1864    runtime
1865        .metrics()
1866        .trace_records
1867        .fetch_add(1, Ordering::Relaxed);
1868    if evicted {
1869        runtime
1870            .metrics()
1871            .trace_evictions
1872            .fetch_add(1, Ordering::Relaxed);
1873    }
1874}
1875
1876fn admission_class_for_operation(operation: TraceOperationKind) -> &'static str {
1877    match operation {
1878        TraceOperationKind::Recall | TraceOperationKind::Snapshot | TraceOperationKind::Stats => {
1879            "read"
1880        }
1881        TraceOperationKind::Upsert
1882        | TraceOperationKind::BatchUpsert
1883        | TraceOperationKind::Compact
1884        | TraceOperationKind::Delete
1885        | TraceOperationKind::Archive
1886        | TraceOperationKind::Suppress
1887        | TraceOperationKind::Recover => "write",
1888        TraceOperationKind::IntegrityCheck
1889        | TraceOperationKind::Repair
1890        | TraceOperationKind::Export
1891        | TraceOperationKind::Import => "admin",
1892    }
1893}
1894
1895fn validate_http_auth(
1896    request: &AxumRequest,
1897    path: &str,
1898    auth: &AuthConfig,
1899) -> Result<(), StatusCode> {
1900    if path == "/healthz" || path == "/readyz" || (path == "/metrics" && !auth.protect_metrics) {
1901        return Ok(());
1902    }
1903    let provided = request
1904        .headers()
1905        .get(axum::http::header::AUTHORIZATION)
1906        .and_then(|value| value.to_str().ok());
1907    let permission = match path {
1908        "/metrics" => AuthPermission::Metrics,
1909        "/memory/recall" => AuthPermission::Read,
1910        "/memory/upsert" | "/memory/batch-upsert" => AuthPermission::Write,
1911        "/admin/stats" | "/admin/integrity" | "/admin/repair" | "/admin/snapshot"
1912        | "/admin/compact" | "/admin/delete" | "/admin/archive" | "/admin/suppress"
1913        | "/admin/recover" | "/admin/runtime" | "/admin/export" | "/admin/import" => {
1914            AuthPermission::Admin
1915        }
1916        _ => AuthPermission::Admin,
1917    };
1918    if path.starts_with("/admin/traces") {
1919        return auth
1920            .authorize(provided, AuthPermission::Admin)
1921            .then_some(())
1922            .ok_or(StatusCode::UNAUTHORIZED);
1923    }
1924    if auth.authorize(provided, permission) {
1925        Ok(())
1926    } else {
1927        Err(StatusCode::UNAUTHORIZED)
1928    }
1929}
1930
1931fn validate_grpc_auth<T>(
1932    request: &Request<T>,
1933    auth: &AuthConfig,
1934    permission: AuthPermission,
1935) -> Result<(), Status> {
1936    let provided = request
1937        .metadata()
1938        .get("authorization")
1939        .and_then(|value| value.to_str().ok());
1940    if auth.authorize(provided, permission) {
1941        Ok(())
1942    } else {
1943        Err(Status::unauthenticated("missing or invalid bearer token"))
1944    }
1945}
1946
1947fn bearer_token_from_header(header: Option<&str>) -> Option<&str> {
1948    header.and_then(|value| value.strip_prefix("Bearer "))
1949}
1950
1951fn invalid_argument(message: impl Into<String>) -> Status {
1952    Status::invalid_argument(message.into())
1953}
1954
1955fn internal_status(message: impl Into<String>) -> Status {
1956    Status::internal(message.into())
1957}
1958
1959fn trust_level_from_proto(value: &str) -> Result<MemoryTrustLevel, Status> {
1960    match value {
1961        "" | "derived" => Ok(MemoryTrustLevel::Derived),
1962        "untrusted" => Ok(MemoryTrustLevel::Untrusted),
1963        "observed" => Ok(MemoryTrustLevel::Observed),
1964        "verified" => Ok(MemoryTrustLevel::Verified),
1965        "pinned" => Ok(MemoryTrustLevel::Pinned),
1966        other => Err(invalid_argument(format!("unknown trust level: {other}"))),
1967    }
1968}
1969
1970fn trust_level_to_proto(value: MemoryTrustLevel) -> String {
1971    match value {
1972        MemoryTrustLevel::Untrusted => "untrusted",
1973        MemoryTrustLevel::Observed => "observed",
1974        MemoryTrustLevel::Derived => "derived",
1975        MemoryTrustLevel::Verified => "verified",
1976        MemoryTrustLevel::Pinned => "pinned",
1977    }
1978    .to_string()
1979}
1980
1981fn record_kind_from_proto(value: &str) -> Result<MemoryRecordKind, Status> {
1982    match value {
1983        "episodic" => Ok(MemoryRecordKind::Episodic),
1984        "summary" => Ok(MemoryRecordKind::Summary),
1985        "fact" => Ok(MemoryRecordKind::Fact),
1986        "preference" => Ok(MemoryRecordKind::Preference),
1987        "task" => Ok(MemoryRecordKind::Task),
1988        "artifact" => Ok(MemoryRecordKind::Artifact),
1989        "hypothesis" => Ok(MemoryRecordKind::Hypothesis),
1990        other => Err(invalid_argument(format!("unknown record kind: {other}"))),
1991    }
1992}
1993
1994fn record_kind_to_proto(value: MemoryRecordKind) -> String {
1995    match value {
1996        MemoryRecordKind::Episodic => "episodic",
1997        MemoryRecordKind::Summary => "summary",
1998        MemoryRecordKind::Fact => "fact",
1999        MemoryRecordKind::Preference => "preference",
2000        MemoryRecordKind::Task => "task",
2001        MemoryRecordKind::Artifact => "artifact",
2002        MemoryRecordKind::Hypothesis => "hypothesis",
2003    }
2004    .to_string()
2005}
2006
2007fn quality_state_from_proto(value: &str) -> Result<MemoryQualityState, Status> {
2008    match value {
2009        "draft" => Ok(MemoryQualityState::Draft),
2010        "active" => Ok(MemoryQualityState::Active),
2011        "verified" => Ok(MemoryQualityState::Verified),
2012        "archived" => Ok(MemoryQualityState::Archived),
2013        "suppressed" => Ok(MemoryQualityState::Suppressed),
2014        "deleted" => Ok(MemoryQualityState::Deleted),
2015        other => Err(invalid_argument(format!("unknown quality state: {other}"))),
2016    }
2017}
2018
2019fn quality_state_to_proto(value: MemoryQualityState) -> String {
2020    match value {
2021        MemoryQualityState::Draft => "draft",
2022        MemoryQualityState::Active => "active",
2023        MemoryQualityState::Verified => "verified",
2024        MemoryQualityState::Archived => "archived",
2025        MemoryQualityState::Suppressed => "suppressed",
2026        MemoryQualityState::Deleted => "deleted",
2027    }
2028    .to_string()
2029}
2030
2031fn scope_from_proto(scope: ProtoMemoryScope) -> Result<MemoryScope, Status> {
2032    Ok(MemoryScope {
2033        tenant_id: scope.tenant_id,
2034        namespace: scope.namespace,
2035        actor_id: scope.actor_id,
2036        conversation_id: scope.conversation_id,
2037        session_id: scope.session_id,
2038        source: scope.source,
2039        labels: scope.labels,
2040        trust_level: trust_level_from_proto(&scope.trust_level)?,
2041    })
2042}
2043
2044fn scope_to_proto(scope: MemoryScope) -> ProtoMemoryScope {
2045    ProtoMemoryScope {
2046        tenant_id: scope.tenant_id,
2047        namespace: scope.namespace,
2048        actor_id: scope.actor_id,
2049        conversation_id: scope.conversation_id,
2050        session_id: scope.session_id,
2051        source: scope.source,
2052        labels: scope.labels,
2053        trust_level: trust_level_to_proto(scope.trust_level),
2054    }
2055}
2056
2057fn artifact_from_proto(value: ProtoArtifactPointer) -> ArtifactPointer {
2058    ArtifactPointer {
2059        uri: value.uri,
2060        media_type: value.media_type,
2061        checksum: value.checksum,
2062    }
2063}
2064
2065fn artifact_to_proto(value: ArtifactPointer) -> ProtoArtifactPointer {
2066    ProtoArtifactPointer {
2067        uri: value.uri,
2068        media_type: value.media_type,
2069        checksum: value.checksum,
2070    }
2071}
2072
2073fn continuity_state_from_proto(value: &str) -> Result<EpisodeContinuityState, Status> {
2074    match value {
2075        "" | "open" => Ok(EpisodeContinuityState::Open),
2076        "resolved" => Ok(EpisodeContinuityState::Resolved),
2077        "superseded" => Ok(EpisodeContinuityState::Superseded),
2078        "abandoned" => Ok(EpisodeContinuityState::Abandoned),
2079        other => Err(invalid_argument(format!(
2080            "unknown continuity state: {other}"
2081        ))),
2082    }
2083}
2084
2085fn continuity_state_to_proto(value: EpisodeContinuityState) -> String {
2086    match value {
2087        EpisodeContinuityState::Open => "open",
2088        EpisodeContinuityState::Resolved => "resolved",
2089        EpisodeContinuityState::Superseded => "superseded",
2090        EpisodeContinuityState::Abandoned => "abandoned",
2091    }
2092    .to_string()
2093}
2094
2095fn affective_provenance_from_proto(value: &str) -> Result<AffectiveAnnotationProvenance, Status> {
2096    match value {
2097        "" | "authored" => Ok(AffectiveAnnotationProvenance::Authored),
2098        "imported" => Ok(AffectiveAnnotationProvenance::Imported),
2099        "derived" => Ok(AffectiveAnnotationProvenance::Derived),
2100        other => Err(invalid_argument(format!(
2101            "unknown affective provenance: {other}"
2102        ))),
2103    }
2104}
2105
2106fn affective_provenance_to_proto(value: AffectiveAnnotationProvenance) -> String {
2107    match value {
2108        AffectiveAnnotationProvenance::Authored => "authored",
2109        AffectiveAnnotationProvenance::Imported => "imported",
2110        AffectiveAnnotationProvenance::Derived => "derived",
2111    }
2112    .to_string()
2113}
2114
2115fn temporal_order_from_proto(value: &str) -> Result<RecallTemporalOrder, Status> {
2116    match value {
2117        "" | "relevance" => Ok(RecallTemporalOrder::Relevance),
2118        "chronological_asc" => Ok(RecallTemporalOrder::ChronologicalAsc),
2119        "chronological_desc" => Ok(RecallTemporalOrder::ChronologicalDesc),
2120        other => Err(invalid_argument(format!("unknown temporal order: {other}"))),
2121    }
2122}
2123
2124fn planning_profile_from_proto(value: &str) -> Result<RecallPlanningProfile, Status> {
2125    match value {
2126        "" | "fast_path" => Ok(RecallPlanningProfile::FastPath),
2127        "continuity_aware" => Ok(RecallPlanningProfile::ContinuityAware),
2128        other => Err(invalid_argument(format!(
2129            "unknown planning profile: {other}"
2130        ))),
2131    }
2132}
2133
2134fn planning_profile_to_proto(value: RecallPlanningProfile) -> String {
2135    match value {
2136        RecallPlanningProfile::FastPath => "fast_path",
2137        RecallPlanningProfile::ContinuityAware => "continuity_aware",
2138    }
2139    .to_string()
2140}
2141
2142fn planner_stage_to_proto(value: RecallPlannerStage) -> String {
2143    match value {
2144        RecallPlannerStage::CandidateGeneration => "candidate_generation",
2145        RecallPlannerStage::GraphExpansion => "graph_expansion",
2146        RecallPlannerStage::Selection => "selection",
2147    }
2148    .to_string()
2149}
2150
2151fn candidate_source_to_proto(value: RecallCandidateSource) -> String {
2152    match value {
2153        RecallCandidateSource::Lexical => "lexical",
2154        RecallCandidateSource::Semantic => "semantic",
2155        RecallCandidateSource::Metadata => "metadata",
2156        RecallCandidateSource::Episode => "episode",
2157        RecallCandidateSource::Graph => "graph",
2158        RecallCandidateSource::Temporal => "temporal",
2159        RecallCandidateSource::Provenance => "provenance",
2160    }
2161    .to_string()
2162}
2163
2164fn affective_annotation_from_proto(
2165    value: ProtoAffectiveAnnotation,
2166) -> Result<AffectiveAnnotation, Status> {
2167    Ok(AffectiveAnnotation {
2168        tone: value.tone,
2169        sentiment: value.sentiment,
2170        urgency: value.urgency,
2171        confidence: value.confidence,
2172        tension: value.tension,
2173        provenance: affective_provenance_from_proto(&value.provenance)?,
2174    })
2175}
2176
2177fn affective_annotation_to_proto(value: AffectiveAnnotation) -> ProtoAffectiveAnnotation {
2178    ProtoAffectiveAnnotation {
2179        tone: value.tone,
2180        sentiment: value.sentiment,
2181        urgency: value.urgency,
2182        confidence: value.confidence,
2183        tension: value.tension,
2184        provenance: affective_provenance_to_proto(value.provenance),
2185    }
2186}
2187
2188fn historical_state_from_proto(value: &str) -> Result<MemoryHistoricalState, Status> {
2189    match value {
2190        "" | "current" => Ok(MemoryHistoricalState::Current),
2191        "historical" => Ok(MemoryHistoricalState::Historical),
2192        "superseded" => Ok(MemoryHistoricalState::Superseded),
2193        other => Err(invalid_argument(format!(
2194            "unknown historical state: {other}"
2195        ))),
2196    }
2197}
2198
2199fn historical_state_to_proto(value: MemoryHistoricalState) -> String {
2200    match value {
2201        MemoryHistoricalState::Current => "current",
2202        MemoryHistoricalState::Historical => "historical",
2203        MemoryHistoricalState::Superseded => "superseded",
2204    }
2205    .to_string()
2206}
2207
2208fn lineage_relation_from_proto(value: &str) -> Result<LineageRelationKind, Status> {
2209    match value {
2210        "" | "derived_from" => Ok(LineageRelationKind::DerivedFrom),
2211        "consolidated_from" => Ok(LineageRelationKind::ConsolidatedFrom),
2212        "supersedes" => Ok(LineageRelationKind::Supersedes),
2213        "superseded_by" => Ok(LineageRelationKind::SupersededBy),
2214        "conflicts_with" => Ok(LineageRelationKind::ConflictsWith),
2215        other => Err(invalid_argument(format!(
2216            "unknown lineage relation: {other}"
2217        ))),
2218    }
2219}
2220
2221fn lineage_relation_to_proto(value: LineageRelationKind) -> String {
2222    match value {
2223        LineageRelationKind::DerivedFrom => "derived_from",
2224        LineageRelationKind::ConsolidatedFrom => "consolidated_from",
2225        LineageRelationKind::Supersedes => "supersedes",
2226        LineageRelationKind::SupersededBy => "superseded_by",
2227        LineageRelationKind::ConflictsWith => "conflicts_with",
2228    }
2229    .to_string()
2230}
2231
2232fn lineage_link_from_proto(value: ProtoLineageLink) -> Result<LineageLink, Status> {
2233    Ok(LineageLink {
2234        record_id: value.record_id,
2235        relation: lineage_relation_from_proto(&value.relation)?,
2236        confidence: value.confidence,
2237    })
2238}
2239
2240fn lineage_link_to_proto(value: LineageLink) -> ProtoLineageLink {
2241    ProtoLineageLink {
2242        record_id: value.record_id,
2243        relation: lineage_relation_to_proto(value.relation),
2244        confidence: value.confidence,
2245    }
2246}
2247
2248fn conflict_review_state_from_proto(value: &str) -> Result<ConflictReviewState, Status> {
2249    match value {
2250        "" | "none" => Ok(ConflictReviewState::None),
2251        "potential_conflict" => Ok(ConflictReviewState::PotentialConflict),
2252        "under_review" => Ok(ConflictReviewState::UnderReview),
2253        "resolved" => Ok(ConflictReviewState::Resolved),
2254        "dismissed" => Ok(ConflictReviewState::Dismissed),
2255        other => Err(invalid_argument(format!(
2256            "unknown conflict review state: {other}"
2257        ))),
2258    }
2259}
2260
2261fn conflict_review_state_to_proto(value: ConflictReviewState) -> String {
2262    match value {
2263        ConflictReviewState::None => "none",
2264        ConflictReviewState::PotentialConflict => "potential_conflict",
2265        ConflictReviewState::UnderReview => "under_review",
2266        ConflictReviewState::Resolved => "resolved",
2267        ConflictReviewState::Dismissed => "dismissed",
2268    }
2269    .to_string()
2270}
2271
2272fn conflict_resolution_kind_from_proto(value: &str) -> Result<ConflictResolutionKind, Status> {
2273    match value {
2274        "" | "none" => Ok(ConflictResolutionKind::None),
2275        "accepted" => Ok(ConflictResolutionKind::Accepted),
2276        "rejected" => Ok(ConflictResolutionKind::Rejected),
2277        "superseded" => Ok(ConflictResolutionKind::Superseded),
2278        "merged" => Ok(ConflictResolutionKind::Merged),
2279        other => Err(invalid_argument(format!(
2280            "unknown conflict resolution kind: {other}"
2281        ))),
2282    }
2283}
2284
2285fn conflict_resolution_kind_to_proto(value: ConflictResolutionKind) -> String {
2286    match value {
2287        ConflictResolutionKind::None => "none",
2288        ConflictResolutionKind::Accepted => "accepted",
2289        ConflictResolutionKind::Rejected => "rejected",
2290        ConflictResolutionKind::Superseded => "superseded",
2291        ConflictResolutionKind::Merged => "merged",
2292    }
2293    .to_string()
2294}
2295
2296fn conflict_annotation_from_proto(
2297    value: ProtoConflictAnnotation,
2298) -> Result<ConflictAnnotation, Status> {
2299    Ok(ConflictAnnotation {
2300        state: conflict_review_state_from_proto(&value.state)?,
2301        conflicting_record_ids: value.conflicting_record_ids,
2302        drift_score: value.drift_score,
2303        resolution: conflict_resolution_kind_from_proto(&value.resolution)?,
2304        resolved_by: value.resolved_by,
2305        resolved_at_unix_ms: value.resolved_at_unix_ms,
2306        note: value.note,
2307    })
2308}
2309
2310fn conflict_annotation_to_proto(value: ConflictAnnotation) -> ProtoConflictAnnotation {
2311    ProtoConflictAnnotation {
2312        state: conflict_review_state_to_proto(value.state),
2313        conflicting_record_ids: value.conflicting_record_ids,
2314        drift_score: value.drift_score,
2315        resolution: conflict_resolution_kind_to_proto(value.resolution),
2316        resolved_by: value.resolved_by,
2317        resolved_at_unix_ms: value.resolved_at_unix_ms,
2318        note: value.note,
2319    }
2320}
2321
2322fn historical_mode_from_proto(value: &str) -> Result<RecallHistoricalMode, Status> {
2323    match value {
2324        "" | "current_only" => Ok(RecallHistoricalMode::CurrentOnly),
2325        "include_historical" => Ok(RecallHistoricalMode::IncludeHistorical),
2326        "historical_only" => Ok(RecallHistoricalMode::HistoricalOnly),
2327        other => Err(invalid_argument(format!(
2328            "unknown historical mode: {other}"
2329        ))),
2330    }
2331}
2332
2333fn episode_context_from_proto(value: ProtoEpisodeContext) -> Result<EpisodeContext, Status> {
2334    Ok(EpisodeContext {
2335        schema_version: if value.schema_version == 0 {
2336            EPISODE_SCHEMA_VERSION
2337        } else {
2338            value.schema_version
2339        },
2340        episode_id: value.episode_id,
2341        summary: value.summary,
2342        continuity_state: continuity_state_from_proto(&value.continuity_state)?,
2343        actor_ids: value.actor_ids,
2344        goal: value.goal,
2345        outcome: value.outcome,
2346        started_at_unix_ms: value.started_at_unix_ms,
2347        ended_at_unix_ms: value.ended_at_unix_ms,
2348        last_active_unix_ms: value.last_active_unix_ms,
2349        recurrence_key: value.recurrence_key,
2350        recurrence_interval_ms: value.recurrence_interval_ms,
2351        boundary_label: value.boundary_label,
2352        previous_record_id: value.previous_record_id,
2353        next_record_id: value.next_record_id,
2354        causal_record_ids: value.causal_record_ids,
2355        related_record_ids: value.related_record_ids,
2356        linked_artifact_uris: value.linked_artifact_uris,
2357        salience: value
2358            .salience
2359            .map_or_else(EpisodeSalience::default, |salience| EpisodeSalience {
2360                reuse_count: salience.reuse_count,
2361                novelty_score: salience.novelty_score,
2362                goal_relevance: salience.goal_relevance,
2363                unresolved_weight: salience.unresolved_weight,
2364            }),
2365        affective: value
2366            .affective
2367            .map(affective_annotation_from_proto)
2368            .transpose()?,
2369    })
2370}
2371
2372fn episode_context_to_proto(value: EpisodeContext) -> ProtoEpisodeContext {
2373    ProtoEpisodeContext {
2374        schema_version: value.schema_version,
2375        episode_id: value.episode_id,
2376        summary: value.summary,
2377        continuity_state: continuity_state_to_proto(value.continuity_state),
2378        actor_ids: value.actor_ids,
2379        goal: value.goal,
2380        outcome: value.outcome,
2381        started_at_unix_ms: value.started_at_unix_ms,
2382        ended_at_unix_ms: value.ended_at_unix_ms,
2383        last_active_unix_ms: value.last_active_unix_ms,
2384        recurrence_key: value.recurrence_key,
2385        recurrence_interval_ms: value.recurrence_interval_ms,
2386        boundary_label: value.boundary_label,
2387        previous_record_id: value.previous_record_id,
2388        next_record_id: value.next_record_id,
2389        causal_record_ids: value.causal_record_ids,
2390        related_record_ids: value.related_record_ids,
2391        linked_artifact_uris: value.linked_artifact_uris,
2392        salience: Some(ProtoEpisodeSalience {
2393            reuse_count: value.salience.reuse_count,
2394            novelty_score: value.salience.novelty_score,
2395            goal_relevance: value.salience.goal_relevance,
2396            unresolved_weight: value.salience.unresolved_weight,
2397        }),
2398        affective: value.affective.map(affective_annotation_to_proto),
2399    }
2400}
2401
2402fn record_from_proto(record: ProtoMemoryRecord) -> Result<MemoryRecord, Status> {
2403    Ok(MemoryRecord {
2404        id: record.id,
2405        scope: scope_from_proto(
2406            record
2407                .scope
2408                .ok_or_else(|| invalid_argument("memory record scope is required"))?,
2409        )?,
2410        kind: record_kind_from_proto(&record.kind)?,
2411        content: record.content,
2412        summary: record.summary,
2413        source_id: record.source_id,
2414        metadata: record.metadata.into_iter().collect(),
2415        quality_state: quality_state_from_proto(&record.quality_state)?,
2416        created_at_unix_ms: record.created_at_unix_ms,
2417        updated_at_unix_ms: record.updated_at_unix_ms,
2418        expires_at_unix_ms: record.expires_at_unix_ms,
2419        importance_score: record.importance_score,
2420        artifact: record.artifact.map(artifact_from_proto),
2421        episode: record.episode.map(episode_context_from_proto).transpose()?,
2422        historical_state: historical_state_from_proto(
2423            record.historical_state.as_deref().unwrap_or(""),
2424        )?,
2425        lineage: record
2426            .lineage
2427            .into_iter()
2428            .map(lineage_link_from_proto)
2429            .collect::<Result<Vec<_>, _>>()?,
2430        conflict: record
2431            .conflict
2432            .map(conflict_annotation_from_proto)
2433            .transpose()?,
2434    })
2435}
2436
2437fn record_to_proto(record: MemoryRecord) -> ProtoMemoryRecord {
2438    ProtoMemoryRecord {
2439        id: record.id,
2440        scope: Some(scope_to_proto(record.scope)),
2441        kind: record_kind_to_proto(record.kind),
2442        content: record.content,
2443        summary: record.summary,
2444        metadata: record.metadata.into_iter().collect(),
2445        quality_state: quality_state_to_proto(record.quality_state),
2446        created_at_unix_ms: record.created_at_unix_ms,
2447        updated_at_unix_ms: record.updated_at_unix_ms,
2448        expires_at_unix_ms: record.expires_at_unix_ms,
2449        importance_score: record.importance_score,
2450        source_id: record.source_id,
2451        artifact: record.artifact.map(artifact_to_proto),
2452        episode: record.episode.map(episode_context_to_proto),
2453        historical_state: Some(historical_state_to_proto(record.historical_state)),
2454        lineage: record
2455            .lineage
2456            .into_iter()
2457            .map(lineage_link_to_proto)
2458            .collect(),
2459        conflict: record.conflict.map(conflict_annotation_to_proto),
2460    }
2461}
2462
2463fn recall_explanation_to_proto(value: RecallExplanation) -> ProtoRecallExplanation {
2464    ProtoRecallExplanation {
2465        selected_channels: value.selected_channels,
2466        policy_notes: value.policy_notes,
2467        trace_id: value.trace_id,
2468        planning_trace: value.planning_trace.map(recall_planning_trace_to_proto),
2469        scorer_kind: value
2470            .scorer_kind
2471            .map(recall_scorer_kind_to_proto)
2472            .unwrap_or(ProtoRecallScorerKind::Unspecified as i32),
2473        scoring_profile: value
2474            .scoring_profile
2475            .map(recall_scoring_profile_to_proto)
2476            .unwrap_or(ProtoRecallScoringProfile::Unspecified as i32),
2477        planning_profile: value.planning_profile.map(planning_profile_to_proto),
2478        policy_profile: value
2479            .policy_profile
2480            .map(recall_policy_profile_to_proto)
2481            .unwrap_or(ProtoRecallPolicyProfile::Unspecified as i32),
2482    }
2483}
2484
2485fn recall_scorer_kind_to_proto(value: RecallScorerKind) -> i32 {
2486    match value {
2487        RecallScorerKind::Profile => ProtoRecallScorerKind::Profile as i32,
2488        RecallScorerKind::Curated => ProtoRecallScorerKind::Curated as i32,
2489    }
2490}
2491
2492fn recall_scoring_profile_to_proto(value: RecallScoringProfile) -> i32 {
2493    match value {
2494        RecallScoringProfile::Balanced => ProtoRecallScoringProfile::Balanced as i32,
2495        RecallScoringProfile::LexicalFirst => ProtoRecallScoringProfile::LexicalFirst as i32,
2496        RecallScoringProfile::ImportanceFirst => ProtoRecallScoringProfile::ImportanceFirst as i32,
2497    }
2498}
2499
2500fn recall_policy_profile_to_proto(value: RecallPolicyProfile) -> i32 {
2501    match value {
2502        RecallPolicyProfile::General => ProtoRecallPolicyProfile::General as i32,
2503        RecallPolicyProfile::Support => ProtoRecallPolicyProfile::Support as i32,
2504        RecallPolicyProfile::Research => ProtoRecallPolicyProfile::Research as i32,
2505        RecallPolicyProfile::Assistant => ProtoRecallPolicyProfile::Assistant as i32,
2506        RecallPolicyProfile::AutonomousAgent => ProtoRecallPolicyProfile::AutonomousAgent as i32,
2507    }
2508}
2509
2510fn embedding_provider_kind_to_proto(value: EmbeddingProviderKind) -> i32 {
2511    match value {
2512        EmbeddingProviderKind::Disabled => ProtoEmbeddingProviderKind::Disabled as i32,
2513        EmbeddingProviderKind::DeterministicLocal => {
2514            ProtoEmbeddingProviderKind::DeterministicLocal as i32
2515        }
2516    }
2517}
2518
2519fn engine_tuning_info_to_proto(value: EngineTuningInfo) -> ProtoEngineTuningInfo {
2520    ProtoEngineTuningInfo {
2521        recall_scorer_kind: recall_scorer_kind_to_proto(value.recall_scorer_kind),
2522        recall_scoring_profile: recall_scoring_profile_to_proto(value.recall_scoring_profile),
2523        embedding_provider_kind: embedding_provider_kind_to_proto(value.embedding_provider_kind),
2524        embedding_dimensions: value.embedding_dimensions as u64,
2525        graph_expansion_max_hops: u32::from(value.graph_expansion_max_hops),
2526        compaction_summarize_after_record_count: value.compaction_summarize_after_record_count
2527            as u64,
2528        compaction_cold_archive_after_days: value.compaction_cold_archive_after_days,
2529        compaction_cold_archive_importance_threshold_per_mille: u32::from(
2530            value.compaction_cold_archive_importance_threshold_per_mille,
2531        ),
2532        recall_planning_profile: Some(planning_profile_to_proto(value.recall_planning_profile)),
2533        recall_policy_profile: recall_policy_profile_to_proto(value.recall_policy_profile),
2534    }
2535}
2536
2537fn engine_tuning_info_from_proto(value: ProtoEngineTuningInfo) -> Result<EngineTuningInfo, Status> {
2538    Ok(EngineTuningInfo {
2539        recall_scorer_kind: match ProtoRecallScorerKind::try_from(value.recall_scorer_kind)
2540            .unwrap_or(ProtoRecallScorerKind::Unspecified)
2541        {
2542            ProtoRecallScorerKind::Profile => RecallScorerKind::Profile,
2543            ProtoRecallScorerKind::Curated => RecallScorerKind::Curated,
2544            ProtoRecallScorerKind::Unspecified => {
2545                return Err(invalid_argument("engine recall scorer kind is required"));
2546            }
2547        },
2548        recall_scoring_profile: match ProtoRecallScoringProfile::try_from(
2549            value.recall_scoring_profile,
2550        )
2551        .unwrap_or(ProtoRecallScoringProfile::Unspecified)
2552        {
2553            ProtoRecallScoringProfile::Balanced => RecallScoringProfile::Balanced,
2554            ProtoRecallScoringProfile::LexicalFirst => RecallScoringProfile::LexicalFirst,
2555            ProtoRecallScoringProfile::ImportanceFirst => RecallScoringProfile::ImportanceFirst,
2556            ProtoRecallScoringProfile::Unspecified => {
2557                return Err(invalid_argument(
2558                    "engine recall scoring profile is required",
2559                ));
2560            }
2561        },
2562        recall_planning_profile: planning_profile_from_proto(
2563            value.recall_planning_profile.as_deref().unwrap_or(""),
2564        )?,
2565        recall_policy_profile: match ProtoRecallPolicyProfile::try_from(value.recall_policy_profile)
2566            .unwrap_or(ProtoRecallPolicyProfile::Unspecified)
2567        {
2568            ProtoRecallPolicyProfile::General => RecallPolicyProfile::General,
2569            ProtoRecallPolicyProfile::Support => RecallPolicyProfile::Support,
2570            ProtoRecallPolicyProfile::Research => RecallPolicyProfile::Research,
2571            ProtoRecallPolicyProfile::Assistant => RecallPolicyProfile::Assistant,
2572            ProtoRecallPolicyProfile::AutonomousAgent => RecallPolicyProfile::AutonomousAgent,
2573            ProtoRecallPolicyProfile::Unspecified => RecallPolicyProfile::General,
2574        },
2575        embedding_provider_kind: match ProtoEmbeddingProviderKind::try_from(
2576            value.embedding_provider_kind,
2577        )
2578        .unwrap_or(ProtoEmbeddingProviderKind::Unspecified)
2579        {
2580            ProtoEmbeddingProviderKind::Disabled => EmbeddingProviderKind::Disabled,
2581            ProtoEmbeddingProviderKind::DeterministicLocal => {
2582                EmbeddingProviderKind::DeterministicLocal
2583            }
2584            ProtoEmbeddingProviderKind::Unspecified => {
2585                return Err(invalid_argument(
2586                    "engine embedding provider kind is required",
2587                ));
2588            }
2589        },
2590        embedding_dimensions: value.embedding_dimensions as usize,
2591        graph_expansion_max_hops: value.graph_expansion_max_hops as u8,
2592        compaction_summarize_after_record_count: value.compaction_summarize_after_record_count
2593            as usize,
2594        compaction_cold_archive_after_days: value.compaction_cold_archive_after_days,
2595        compaction_cold_archive_importance_threshold_per_mille: value
2596            .compaction_cold_archive_importance_threshold_per_mille
2597            as u16,
2598    })
2599}
2600
2601fn recall_planning_trace_to_proto(value: RecallPlanningTrace) -> ProtoRecallPlanningTrace {
2602    ProtoRecallPlanningTrace {
2603        trace_id: value.trace_id,
2604        token_budget_applied: value.token_budget_applied,
2605        candidates: value
2606            .candidates
2607            .into_iter()
2608            .map(recall_trace_candidate_to_proto)
2609            .collect(),
2610    }
2611}
2612
2613fn recall_trace_candidate_to_proto(value: RecallTraceCandidate) -> ProtoRecallTraceCandidate {
2614    ProtoRecallTraceCandidate {
2615        record_id: value.record_id,
2616        kind: record_kind_to_proto(value.kind),
2617        selected: value.selected,
2618        matched_terms: value.matched_terms,
2619        decision_reason: value.decision_reason,
2620        breakdown: Some(recall_score_breakdown_to_proto(value.breakdown)),
2621        selection_rank: value.selection_rank,
2622        selected_channels: value.selected_channels,
2623        filter_reasons: value.filter_reasons,
2624        candidate_sources: value
2625            .candidate_sources
2626            .into_iter()
2627            .map(candidate_source_to_proto)
2628            .collect(),
2629        planner_stage: Some(planner_stage_to_proto(value.planner_stage)),
2630    }
2631}
2632
2633fn recall_score_breakdown_to_proto(
2634    value: mnemara_core::RecallScoreBreakdown,
2635) -> ProtoRecallScoreBreakdown {
2636    ProtoRecallScoreBreakdown {
2637        lexical: value.lexical,
2638        semantic: value.semantic,
2639        graph: value.graph,
2640        temporal: value.temporal,
2641        policy: value.policy,
2642        total: value.total,
2643        metadata: value.metadata,
2644        curation: value.curation,
2645        episodic: value.episodic,
2646        salience: value.salience,
2647    }
2648}
2649
2650fn namespace_stats_to_proto(value: NamespaceStats) -> ProtoNamespaceStats {
2651    ProtoNamespaceStats {
2652        tenant_id: value.tenant_id,
2653        namespace: value.namespace,
2654        active_records: value.active_records,
2655        archived_records: value.archived_records,
2656        deleted_records: value.deleted_records,
2657        suppressed_records: value.suppressed_records,
2658        pinned_records: value.pinned_records,
2659    }
2660}
2661
2662fn maintenance_stats_to_proto(value: MaintenanceStats) -> ProtoMaintenanceStats {
2663    ProtoMaintenanceStats {
2664        duplicate_candidate_groups: value.duplicate_candidate_groups,
2665        duplicate_candidate_records: value.duplicate_candidate_records,
2666        tombstoned_records: value.tombstoned_records,
2667        expired_records: value.expired_records,
2668        stale_idempotency_keys: value.stale_idempotency_keys,
2669        historical_records: value.historical_records,
2670        superseded_records: value.superseded_records,
2671        lineage_links: value.lineage_links,
2672    }
2673}
2674
2675fn portable_record_to_proto(value: PortableRecord) -> ProtoPortableRecord {
2676    ProtoPortableRecord {
2677        record: Some(record_to_proto(value.record)),
2678        idempotency_key: value.idempotency_key,
2679    }
2680}
2681
2682fn snapshot_manifest_to_proto(value: SnapshotManifest) -> SnapshotReply {
2683    SnapshotReply {
2684        snapshot_id: value.snapshot_id,
2685        created_at_unix_ms: value.created_at_unix_ms,
2686        namespaces: value.namespaces,
2687        record_count: value.record_count,
2688        storage_bytes: value.storage_bytes,
2689        engine: Some(engine_tuning_info_to_proto(value.engine)),
2690    }
2691}
2692
2693fn portable_package_to_proto(value: PortableStorePackage) -> ProtoExportReply {
2694    ProtoExportReply {
2695        package_version: value.package_version,
2696        exported_at_unix_ms: value.exported_at_unix_ms,
2697        manifest: Some(snapshot_manifest_to_proto(value.manifest)),
2698        records: value
2699            .records
2700            .into_iter()
2701            .map(portable_record_to_proto)
2702            .collect(),
2703    }
2704}
2705
2706fn import_mode_from_proto(value: i32) -> Result<ImportMode, Status> {
2707    match ProtoImportMode::try_from(value).unwrap_or(ProtoImportMode::Unspecified) {
2708        ProtoImportMode::Validate => Ok(ImportMode::Validate),
2709        ProtoImportMode::Merge => Ok(ImportMode::Merge),
2710        ProtoImportMode::Replace => Ok(ImportMode::Replace),
2711        ProtoImportMode::Unspecified => Err(invalid_argument("import mode is required")),
2712    }
2713}
2714
2715fn portable_record_from_proto(value: ProtoPortableRecord) -> Result<PortableRecord, Status> {
2716    Ok(PortableRecord {
2717        record: record_from_proto(
2718            value
2719                .record
2720                .ok_or_else(|| invalid_argument("portable record payload is required"))?,
2721        )?,
2722        idempotency_key: value.idempotency_key,
2723    })
2724}
2725
2726fn snapshot_manifest_from_proto(value: SnapshotReply) -> Result<SnapshotManifest, Status> {
2727    Ok(SnapshotManifest {
2728        snapshot_id: value.snapshot_id,
2729        created_at_unix_ms: value.created_at_unix_ms,
2730        namespaces: value.namespaces,
2731        record_count: value.record_count,
2732        storage_bytes: value.storage_bytes,
2733        engine: value
2734            .engine
2735            .map(engine_tuning_info_from_proto)
2736            .transpose()?
2737            .ok_or_else(|| invalid_argument("snapshot engine tuning info is required"))?,
2738    })
2739}
2740
2741fn portable_package_from_proto(value: ProtoExportReply) -> Result<PortableStorePackage, Status> {
2742    Ok(PortableStorePackage {
2743        package_version: value.package_version,
2744        exported_at_unix_ms: value.exported_at_unix_ms,
2745        manifest: snapshot_manifest_from_proto(
2746            value
2747                .manifest
2748                .ok_or_else(|| invalid_argument("portable export manifest is required"))?,
2749        )?,
2750        records: value
2751            .records
2752            .into_iter()
2753            .map(portable_record_from_proto)
2754            .collect::<Result<Vec<_>, _>>()?,
2755    })
2756}
2757
2758fn trace_operation_kind_to_proto(value: TraceOperationKind) -> i32 {
2759    match value {
2760        TraceOperationKind::Upsert => ProtoTraceOperationKind::Upsert as i32,
2761        TraceOperationKind::BatchUpsert => ProtoTraceOperationKind::BatchUpsert as i32,
2762        TraceOperationKind::Recall => ProtoTraceOperationKind::Recall as i32,
2763        TraceOperationKind::Snapshot => ProtoTraceOperationKind::Snapshot as i32,
2764        TraceOperationKind::Stats => ProtoTraceOperationKind::Stats as i32,
2765        TraceOperationKind::IntegrityCheck => ProtoTraceOperationKind::IntegrityCheck as i32,
2766        TraceOperationKind::Repair => ProtoTraceOperationKind::Repair as i32,
2767        TraceOperationKind::Compact => ProtoTraceOperationKind::Compact as i32,
2768        TraceOperationKind::Delete => ProtoTraceOperationKind::Delete as i32,
2769        TraceOperationKind::Archive => ProtoTraceOperationKind::Archive as i32,
2770        TraceOperationKind::Suppress => ProtoTraceOperationKind::Suppress as i32,
2771        TraceOperationKind::Recover => ProtoTraceOperationKind::Recover as i32,
2772        TraceOperationKind::Export => ProtoTraceOperationKind::Export as i32,
2773        TraceOperationKind::Import => ProtoTraceOperationKind::Import as i32,
2774    }
2775}
2776
2777fn trace_operation_kind_from_proto(value: ProtoTraceOperationKind) -> Option<TraceOperationKind> {
2778    match value {
2779        ProtoTraceOperationKind::Upsert => Some(TraceOperationKind::Upsert),
2780        ProtoTraceOperationKind::BatchUpsert => Some(TraceOperationKind::BatchUpsert),
2781        ProtoTraceOperationKind::Recall => Some(TraceOperationKind::Recall),
2782        ProtoTraceOperationKind::Snapshot => Some(TraceOperationKind::Snapshot),
2783        ProtoTraceOperationKind::Stats => Some(TraceOperationKind::Stats),
2784        ProtoTraceOperationKind::IntegrityCheck => Some(TraceOperationKind::IntegrityCheck),
2785        ProtoTraceOperationKind::Repair => Some(TraceOperationKind::Repair),
2786        ProtoTraceOperationKind::Compact => Some(TraceOperationKind::Compact),
2787        ProtoTraceOperationKind::Delete => Some(TraceOperationKind::Delete),
2788        ProtoTraceOperationKind::Archive => Some(TraceOperationKind::Archive),
2789        ProtoTraceOperationKind::Suppress => Some(TraceOperationKind::Suppress),
2790        ProtoTraceOperationKind::Recover => Some(TraceOperationKind::Recover),
2791        ProtoTraceOperationKind::Export => Some(TraceOperationKind::Export),
2792        ProtoTraceOperationKind::Import => Some(TraceOperationKind::Import),
2793        ProtoTraceOperationKind::Unspecified => None,
2794    }
2795}
2796
2797fn trace_status_to_proto(value: TraceStatus) -> i32 {
2798    match value {
2799        TraceStatus::Ok => ProtoTraceStatus::Ok as i32,
2800        TraceStatus::Rejected => ProtoTraceStatus::Rejected as i32,
2801        TraceStatus::Error => ProtoTraceStatus::Error as i32,
2802    }
2803}
2804
2805fn trace_status_from_proto(value: ProtoTraceStatus) -> Option<TraceStatus> {
2806    match value {
2807        ProtoTraceStatus::Ok => Some(TraceStatus::Ok),
2808        ProtoTraceStatus::Rejected => Some(TraceStatus::Rejected),
2809        ProtoTraceStatus::Error => Some(TraceStatus::Error),
2810        ProtoTraceStatus::Unspecified => None,
2811    }
2812}
2813
2814fn operation_trace_to_proto(value: OperationTrace) -> ProtoOperationTrace {
2815    ProtoOperationTrace {
2816        trace_id: value.trace_id,
2817        correlation_id: value.correlation_id,
2818        operation: trace_operation_kind_to_proto(value.operation),
2819        transport: value.transport,
2820        backend: value.backend,
2821        admission_class: value.admission_class,
2822        tenant_id: value.tenant_id,
2823        namespace: value.namespace,
2824        principal: value.principal,
2825        store_span_id: value.store_span_id,
2826        planning_trace_id: value.planning_trace_id,
2827        started_at_unix_ms: value.started_at_unix_ms,
2828        completed_at_unix_ms: value.completed_at_unix_ms,
2829        latency_ms: value.latency_ms,
2830        status: trace_status_to_proto(value.status),
2831        status_message: value.status_message,
2832        summary: Some(ProtoOperationTraceSummary {
2833            record_id: value.summary.record_id,
2834            request_count: value.summary.request_count,
2835            query_text: value.summary.query_text,
2836            max_items: value.summary.max_items,
2837            token_budget: value.summary.token_budget,
2838            dry_run: value.summary.dry_run,
2839        }),
2840        recall_explanation: value.recall_explanation.map(recall_explanation_to_proto),
2841    }
2842}
2843
2844fn kinds_from_proto(values: Vec<String>) -> Result<Vec<MemoryRecordKind>, Status> {
2845    values
2846        .into_iter()
2847        .map(|value| record_kind_from_proto(&value))
2848        .collect()
2849}
2850
2851fn trust_levels_from_proto(values: Vec<String>) -> Result<Vec<MemoryTrustLevel>, Status> {
2852    values
2853        .into_iter()
2854        .map(|value| trust_level_from_proto(&value))
2855        .collect()
2856}
2857
2858fn quality_states_from_proto(values: Vec<String>) -> Result<Vec<MemoryQualityState>, Status> {
2859    values
2860        .into_iter()
2861        .map(|value| quality_state_from_proto(&value))
2862        .collect()
2863}
2864
2865fn conflict_states_from_proto(values: Vec<String>) -> Result<Vec<ConflictReviewState>, Status> {
2866    values
2867        .into_iter()
2868        .map(|value| conflict_review_state_from_proto(&value))
2869        .collect()
2870}
2871
2872fn resolution_kinds_from_proto(values: Vec<String>) -> Result<Vec<ConflictResolutionKind>, Status> {
2873    values
2874        .into_iter()
2875        .map(|value| conflict_resolution_kind_from_proto(&value))
2876        .collect()
2877}
2878
2879fn recall_filters_from_proto(filters: Option<ProtoRecallFilters>) -> Result<RecallFilters, Status> {
2880    let Some(filters) = filters else {
2881        return Ok(RecallFilters::default());
2882    };
2883
2884    Ok(RecallFilters {
2885        kinds: kinds_from_proto(filters.kinds)?,
2886        required_labels: filters.required_labels,
2887        source: filters.source,
2888        from_unix_ms: filters.from_unix_ms,
2889        to_unix_ms: filters.to_unix_ms,
2890        min_importance_score: filters.min_importance_score,
2891        trust_levels: trust_levels_from_proto(filters.trust_levels)?,
2892        states: quality_states_from_proto(filters.states)?,
2893        include_archived: filters.include_archived,
2894        episode_id: filters.episode_id,
2895        continuity_states: filters
2896            .continuity_states
2897            .into_iter()
2898            .map(|value| continuity_state_from_proto(&value))
2899            .collect::<Result<Vec<_>, _>>()?,
2900        unresolved_only: filters.unresolved_only,
2901        temporal_order: temporal_order_from_proto(filters.temporal_order.as_deref().unwrap_or(""))?,
2902        historical_mode: historical_mode_from_proto(
2903            filters.historical_mode.as_deref().unwrap_or(""),
2904        )?,
2905        lineage_record_id: filters.lineage_record_id,
2906        before_record_id: filters.before_record_id,
2907        after_record_id: filters.after_record_id,
2908        boundary_labels: filters.boundary_labels,
2909        recurrence_key: filters.recurrence_key,
2910        conflict_states: conflict_states_from_proto(filters.conflict_states)?,
2911        resolution_kinds: resolution_kinds_from_proto(filters.resolution_kinds)?,
2912        unresolved_conflicts_only: filters.unresolved_conflicts_only,
2913    })
2914}
2915
2916fn validate_scope_limits(scope: &MemoryScope, limits: &ServerLimits) -> Result<(), Status> {
2917    if scope.labels.len() > limits.max_labels_per_scope {
2918        return Err(invalid_argument(format!(
2919            "scope labels exceed configured max of {}",
2920            limits.max_labels_per_scope
2921        )));
2922    }
2923    Ok(())
2924}
2925
2926fn validate_record_limits(record: &MemoryRecord, limits: &ServerLimits) -> Result<(), Status> {
2927    validate_scope_limits(&record.scope, limits)?;
2928    if record.content.len() > limits.max_record_content_bytes {
2929        return Err(invalid_argument(format!(
2930            "record content exceeds configured max of {} bytes",
2931            limits.max_record_content_bytes
2932        )));
2933    }
2934    if record.summary.as_deref().map(str::len).unwrap_or(0) > limits.max_query_text_bytes {
2935        return Err(invalid_argument(format!(
2936            "record summary exceeds configured max of {} bytes",
2937            limits.max_query_text_bytes
2938        )));
2939    }
2940    Ok(())
2941}
2942
2943fn validate_recall_limits(query: &RecallQuery, limits: &ServerLimits) -> Result<(), Status> {
2944    validate_scope_limits(&query.scope, limits)?;
2945    if query.max_items == 0 {
2946        return Err(invalid_argument("recall max_items must be greater than 0"));
2947    }
2948    if query.max_items > limits.max_recall_items {
2949        return Err(invalid_argument(format!(
2950            "recall max_items exceeds configured max of {}",
2951            limits.max_recall_items
2952        )));
2953    }
2954    if query.query_text.len() > limits.max_query_text_bytes {
2955        return Err(invalid_argument(format!(
2956            "query text exceeds configured max of {} bytes",
2957            limits.max_query_text_bytes
2958        )));
2959    }
2960    Ok(())
2961}
2962
2963fn map_store_error(error: mnemara_core::Error) -> Status {
2964    match error {
2965        mnemara_core::Error::InvalidConfig(message)
2966        | mnemara_core::Error::InvalidRequest(message) => invalid_argument(message),
2967        mnemara_core::Error::Conflict(message) => Status::already_exists(message),
2968        mnemara_core::Error::Unsupported(message) => Status::unimplemented(message),
2969        mnemara_core::Error::Backend(message) => internal_status(message),
2970    }
2971}
2972
2973fn map_http_store_error(error: mnemara_core::Error) -> (StatusCode, Json<HttpErrorBody>) {
2974    let status = match error {
2975        mnemara_core::Error::InvalidConfig(_) | mnemara_core::Error::InvalidRequest(_) => {
2976            StatusCode::BAD_REQUEST
2977        }
2978        mnemara_core::Error::Conflict(_) => StatusCode::CONFLICT,
2979        mnemara_core::Error::Unsupported(_) => StatusCode::NOT_IMPLEMENTED,
2980        mnemara_core::Error::Backend(_) => StatusCode::INTERNAL_SERVER_ERROR,
2981    };
2982    (
2983        status,
2984        Json(HttpErrorBody {
2985            error: error.to_string(),
2986        }),
2987    )
2988}
2989
2990fn map_http_validation_error(status: Status) -> (StatusCode, Json<HttpErrorBody>) {
2991    (
2992        StatusCode::BAD_REQUEST,
2993        Json(HttpErrorBody {
2994            error: status.message().to_string(),
2995        }),
2996    )
2997}
2998
2999fn record_grpc_result<T>(
3000    metrics: &MethodMetrics,
3001    result: Result<Response<T>, Status>,
3002) -> Result<Response<T>, Status> {
3003    match result {
3004        Ok(response) => {
3005            metrics.record_status(&Status::ok(""));
3006            Ok(response)
3007        }
3008        Err(status) => {
3009            metrics.record_status(&status);
3010            Err(status)
3011        }
3012    }
3013}
3014
3015#[tonic::async_trait]
3016impl<S> MemoryService for GrpcMemoryService<S>
3017where
3018    S: MemoryStore + 'static,
3019{
3020    async fn upsert_memory_record(
3021        &self,
3022        request: Request<ProtoUpsertMemoryRecordRequest>,
3023    ) -> Result<Response<UpsertMemoryRecordReply>, Status> {
3024        self.runtime.metrics().grpc_upsert.record_started();
3025        validate_grpc_auth(
3026            &request,
3027            self.runtime.auth().as_ref(),
3028            AuthPermission::Write,
3029        )?;
3030        let principal = grpc_principal(&request);
3031        let started_at_unix_ms = now_unix_ms();
3032        let correlation_id = self.runtime.traces().next_id("corr");
3033        let result = async {
3034            let request = request.into_inner();
3035            let record = record_from_proto(
3036                request
3037                    .record
3038                    .ok_or_else(|| invalid_argument("memory record is required"))?,
3039            )?;
3040            let tenant_id = record.scope.tenant_id.clone();
3041            let namespace = record.scope.namespace.clone();
3042            validate_record_limits(&record, self.runtime.limits().as_ref())?;
3043            let _permit = self
3044                .runtime
3045                .admission()
3046                .acquire(AdmissionClass::Write, Some(&tenant_id))
3047                .await
3048                .map_err(|error| {
3049                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3050                })?;
3051            let receipt = self
3052                .store
3053                .upsert(mnemara_core::UpsertRequest {
3054                    record,
3055                    idempotency_key: request.idempotency_key,
3056                })
3057                .await
3058                .map_err(map_store_error)?;
3059            record_trace(
3060                &self.runtime,
3061                TraceOperationKind::Upsert,
3062                "grpc",
3063                Some(tenant_id),
3064                Some(namespace),
3065                principal,
3066                started_at_unix_ms,
3067                TraceStatus::Ok,
3068                None,
3069                OperationTraceSummary {
3070                    record_id: Some(receipt.record_id.clone()),
3071                    ..OperationTraceSummary::default()
3072                },
3073                None,
3074                correlation_id,
3075            );
3076
3077            Ok(Response::new(UpsertMemoryRecordReply {
3078                record_id: receipt.record_id,
3079                deduplicated: receipt.deduplicated,
3080                summary_refreshed: receipt.summary_refreshed,
3081            }))
3082        }
3083        .await;
3084        record_grpc_result(&self.runtime.metrics().grpc_upsert, result)
3085    }
3086
3087    async fn batch_upsert_memory_records(
3088        &self,
3089        request: Request<BatchUpsertMemoryRecordsRequest>,
3090    ) -> Result<Response<BatchUpsertMemoryRecordsReply>, Status> {
3091        self.runtime.metrics().grpc_batch_upsert.record_started();
3092        validate_grpc_auth(
3093            &request,
3094            self.runtime.auth().as_ref(),
3095            AuthPermission::Write,
3096        )?;
3097        let principal = grpc_principal(&request);
3098        let started_at_unix_ms = now_unix_ms();
3099        let correlation_id = self.runtime.traces().next_id("corr");
3100        let result = async {
3101            let request = request.into_inner();
3102            if request.requests.len() > self.runtime.limits().max_batch_upsert_requests {
3103                return Err(invalid_argument(format!(
3104                    "batch upsert request count exceeds configured max of {}",
3105                    self.runtime.limits().max_batch_upsert_requests
3106                )));
3107            }
3108            let requests = request
3109                .requests
3110                .into_iter()
3111                .map(|request| {
3112                    let record = record_from_proto(
3113                        request
3114                            .record
3115                            .ok_or_else(|| invalid_argument("memory record is required"))?,
3116                    )?;
3117                    validate_record_limits(&record, self.runtime.limits().as_ref())?;
3118                    Ok(mnemara_core::UpsertRequest {
3119                        record,
3120                        idempotency_key: request.idempotency_key,
3121                    })
3122                })
3123                .collect::<Result<Vec<_>, Status>>()?;
3124            let tenant_id = requests
3125                .first()
3126                .map(|item| item.record.scope.tenant_id.clone());
3127            let namespace = requests
3128                .first()
3129                .map(|item| item.record.scope.namespace.clone());
3130            let _permit = self
3131                .runtime
3132                .admission()
3133                .acquire(AdmissionClass::Write, tenant_id.as_deref())
3134                .await
3135                .map_err(|error| {
3136                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3137                })?;
3138
3139            let receipts: Vec<UpsertMemoryRecordReply> = self
3140                .store
3141                .batch_upsert(BatchUpsertRequest { requests })
3142                .await
3143                .map_err(map_store_error)?
3144                .into_iter()
3145                .map(|receipt| UpsertMemoryRecordReply {
3146                    record_id: receipt.record_id,
3147                    deduplicated: receipt.deduplicated,
3148                    summary_refreshed: receipt.summary_refreshed,
3149                })
3150                .collect();
3151            record_trace(
3152                &self.runtime,
3153                TraceOperationKind::BatchUpsert,
3154                "grpc",
3155                tenant_id,
3156                namespace,
3157                principal,
3158                started_at_unix_ms,
3159                TraceStatus::Ok,
3160                None,
3161                OperationTraceSummary {
3162                    request_count: Some(receipts.len() as u32),
3163                    ..OperationTraceSummary::default()
3164                },
3165                None,
3166                correlation_id,
3167            );
3168
3169            Ok(Response::new(BatchUpsertMemoryRecordsReply { receipts }))
3170        }
3171        .await;
3172        record_grpc_result(&self.runtime.metrics().grpc_batch_upsert, result)
3173    }
3174
3175    async fn recall(
3176        &self,
3177        request: Request<ProtoRecallRequest>,
3178    ) -> Result<Response<RecallReply>, Status> {
3179        self.runtime.metrics().grpc_recall.record_started();
3180        validate_grpc_auth(&request, self.runtime.auth().as_ref(), AuthPermission::Read)?;
3181        let principal = grpc_principal(&request);
3182        let started_at_unix_ms = now_unix_ms();
3183        let correlation_id = self.runtime.traces().next_id("corr");
3184        let result = async {
3185            let request = request.into_inner();
3186            let query = RecallQuery {
3187                scope: scope_from_proto(
3188                    request
3189                        .scope
3190                        .ok_or_else(|| invalid_argument("recall scope is required"))?,
3191                )?,
3192                query_text: request.query_text,
3193                max_items: request.max_items as usize,
3194                token_budget: request.token_budget.map(|value| value as usize),
3195                filters: recall_filters_from_proto(request.filters)?,
3196                include_explanation: request.include_explanation,
3197            };
3198            validate_recall_limits(&query, self.runtime.limits().as_ref())?;
3199            let tenant_id = query.scope.tenant_id.clone();
3200            let namespace = query.scope.namespace.clone();
3201            let query_text = query.query_text.clone();
3202            let max_items = query.max_items as u32;
3203            let token_budget = query.token_budget.map(|value| value as u32);
3204            let _permit = self
3205                .runtime
3206                .admission()
3207                .acquire(AdmissionClass::Read, Some(&tenant_id))
3208                .await
3209                .map_err(|error| {
3210                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3211                })?;
3212            let mut result = self.store.recall(query).await.map_err(map_store_error)?;
3213            attach_correlation_id(&mut result, &correlation_id);
3214            let recall_explanation = result.explanation.clone();
3215
3216            let hits = result
3217                .hits
3218                .into_iter()
3219                .map(|hit| ProtoRecallHit {
3220                    record: Some(record_to_proto(hit.record)),
3221                    breakdown: Some(recall_score_breakdown_to_proto(hit.breakdown)),
3222                    selected_channels: hit
3223                        .explanation
3224                        .as_ref()
3225                        .map(|value| value.selected_channels.clone())
3226                        .unwrap_or_default(),
3227                    policy_notes: hit
3228                        .explanation
3229                        .as_ref()
3230                        .map(|value| value.policy_notes.clone())
3231                        .unwrap_or_default(),
3232                    explanation: hit.explanation.map(recall_explanation_to_proto),
3233                })
3234                .collect();
3235            record_trace(
3236                &self.runtime,
3237                TraceOperationKind::Recall,
3238                "grpc",
3239                Some(tenant_id),
3240                Some(namespace),
3241                principal,
3242                started_at_unix_ms,
3243                TraceStatus::Ok,
3244                None,
3245                OperationTraceSummary {
3246                    query_text: Some(query_text),
3247                    max_items: Some(max_items),
3248                    token_budget,
3249                    ..OperationTraceSummary::default()
3250                },
3251                recall_explanation,
3252                correlation_id,
3253            );
3254
3255            Ok(Response::new(RecallReply {
3256                hits,
3257                total_candidates_examined: result.total_candidates_examined as u32,
3258                explanation: result.explanation.map(recall_explanation_to_proto),
3259            }))
3260        }
3261        .await;
3262        record_grpc_result(&self.runtime.metrics().grpc_recall, result)
3263    }
3264
3265    async fn compact(
3266        &self,
3267        request: Request<ProtoCompactRequest>,
3268    ) -> Result<Response<CompactReply>, Status> {
3269        self.runtime.metrics().grpc_compact.record_started();
3270        validate_grpc_auth(
3271            &request,
3272            self.runtime.auth().as_ref(),
3273            AuthPermission::Admin,
3274        )?;
3275        let principal = grpc_principal(&request);
3276        let started_at_unix_ms = now_unix_ms();
3277        let correlation_id = self.runtime.traces().next_id("corr");
3278        let result = async {
3279            let request = request.into_inner();
3280            let tenant_id = request.tenant_id.clone();
3281            let namespace = request.namespace.clone();
3282            let _permit = self
3283                .runtime
3284                .admission()
3285                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3286                .await
3287                .map_err(|error| {
3288                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3289                })?;
3290            let report = self
3291                .store
3292                .compact(CompactionRequest {
3293                    tenant_id,
3294                    namespace: namespace.clone(),
3295                    dry_run: request.dry_run,
3296                    reason: request.reason,
3297                })
3298                .await
3299                .map_err(map_store_error)?;
3300            record_trace(
3301                &self.runtime,
3302                TraceOperationKind::Compact,
3303                "grpc",
3304                Some(request.tenant_id),
3305                namespace,
3306                principal,
3307                started_at_unix_ms,
3308                TraceStatus::Ok,
3309                None,
3310                OperationTraceSummary {
3311                    dry_run: Some(request.dry_run),
3312                    ..OperationTraceSummary::default()
3313                },
3314                None,
3315                correlation_id,
3316            );
3317
3318            Ok(Response::new(CompactReply {
3319                deduplicated_records: report.deduplicated_records,
3320                archived_records: report.archived_records,
3321                summarized_clusters: report.summarized_clusters,
3322                pruned_graph_edges: report.pruned_graph_edges,
3323                superseded_records: report.superseded_records,
3324                lineage_links_created: report.lineage_links_created,
3325                dry_run: report.dry_run,
3326            }))
3327        }
3328        .await;
3329        record_grpc_result(&self.runtime.metrics().grpc_compact, result)
3330    }
3331
3332    async fn snapshot(
3333        &self,
3334        request: Request<SnapshotRequest>,
3335    ) -> Result<Response<SnapshotReply>, Status> {
3336        self.runtime.metrics().grpc_snapshot.record_started();
3337        validate_grpc_auth(
3338            &request,
3339            self.runtime.auth().as_ref(),
3340            AuthPermission::Admin,
3341        )?;
3342        let principal = grpc_principal(&request);
3343        let started_at_unix_ms = now_unix_ms();
3344        let correlation_id = self.runtime.traces().next_id("corr");
3345        let result = async {
3346            let _permit = self
3347                .runtime
3348                .admission()
3349                .acquire(AdmissionClass::Admin, None)
3350                .await
3351                .map_err(|error| {
3352                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3353                })?;
3354            let manifest = self.store.snapshot().await.map_err(map_store_error)?;
3355            record_trace(
3356                &self.runtime,
3357                TraceOperationKind::Snapshot,
3358                "grpc",
3359                None,
3360                None,
3361                principal,
3362                started_at_unix_ms,
3363                TraceStatus::Ok,
3364                None,
3365                OperationTraceSummary::default(),
3366                None,
3367                correlation_id,
3368            );
3369            Ok(Response::new(SnapshotReply {
3370                snapshot_id: manifest.snapshot_id,
3371                created_at_unix_ms: manifest.created_at_unix_ms,
3372                namespaces: manifest.namespaces,
3373                record_count: manifest.record_count,
3374                storage_bytes: manifest.storage_bytes,
3375                engine: Some(engine_tuning_info_to_proto(manifest.engine)),
3376            }))
3377        }
3378        .await;
3379        record_grpc_result(&self.runtime.metrics().grpc_snapshot, result)
3380    }
3381
3382    async fn delete(
3383        &self,
3384        request: Request<ProtoDeleteRequest>,
3385    ) -> Result<Response<DeleteReply>, Status> {
3386        self.runtime.metrics().grpc_delete.record_started();
3387        validate_grpc_auth(
3388            &request,
3389            self.runtime.auth().as_ref(),
3390            AuthPermission::Admin,
3391        )?;
3392        let principal = grpc_principal(&request);
3393        let started_at_unix_ms = now_unix_ms();
3394        let correlation_id = self.runtime.traces().next_id("corr");
3395        let result = async {
3396            let request = request.into_inner();
3397            let tenant_id = request.tenant_id.clone();
3398            let namespace = request.namespace.clone();
3399            let _permit = self
3400                .runtime
3401                .admission()
3402                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3403                .await
3404                .map_err(|error| {
3405                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3406                })?;
3407            let receipt = self
3408                .store
3409                .delete(DeleteRequest {
3410                    tenant_id,
3411                    namespace: namespace.clone(),
3412                    record_id: request.record_id,
3413                    hard_delete: request.hard_delete,
3414                    audit_reason: request.audit_reason,
3415                })
3416                .await
3417                .map_err(map_store_error)?;
3418            record_trace(
3419                &self.runtime,
3420                TraceOperationKind::Delete,
3421                "grpc",
3422                Some(request.tenant_id),
3423                Some(namespace),
3424                principal,
3425                started_at_unix_ms,
3426                TraceStatus::Ok,
3427                None,
3428                OperationTraceSummary {
3429                    record_id: Some(receipt.record_id.clone()),
3430                    ..OperationTraceSummary::default()
3431                },
3432                None,
3433                correlation_id,
3434            );
3435
3436            Ok(Response::new(DeleteReply {
3437                record_id: receipt.record_id,
3438                tombstoned: receipt.tombstoned,
3439                hard_deleted: receipt.hard_deleted,
3440            }))
3441        }
3442        .await;
3443        record_grpc_result(&self.runtime.metrics().grpc_delete, result)
3444    }
3445
3446    async fn archive(
3447        &self,
3448        request: Request<ProtoArchiveRequest>,
3449    ) -> Result<Response<ArchiveReply>, Status> {
3450        self.runtime.metrics().grpc_archive.record_started();
3451        validate_grpc_auth(
3452            &request,
3453            self.runtime.auth().as_ref(),
3454            AuthPermission::Admin,
3455        )?;
3456        let principal = grpc_principal(&request);
3457        let started_at_unix_ms = now_unix_ms();
3458        let correlation_id = self.runtime.traces().next_id("corr");
3459        let result = async {
3460            let request = request.into_inner();
3461            let tenant_id = request.tenant_id.clone();
3462            let namespace = request.namespace.clone();
3463            let _permit = self
3464                .runtime
3465                .admission()
3466                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3467                .await
3468                .map_err(|error| {
3469                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3470                })?;
3471            let receipt = self
3472                .store
3473                .archive(ArchiveRequest {
3474                    tenant_id,
3475                    namespace: namespace.clone(),
3476                    record_id: request.record_id,
3477                    dry_run: request.dry_run,
3478                    audit_reason: request.audit_reason,
3479                })
3480                .await
3481                .map_err(map_store_error)?;
3482            record_trace(
3483                &self.runtime,
3484                TraceOperationKind::Archive,
3485                "grpc",
3486                Some(request.tenant_id),
3487                Some(namespace),
3488                principal,
3489                started_at_unix_ms,
3490                TraceStatus::Ok,
3491                None,
3492                OperationTraceSummary {
3493                    record_id: Some(receipt.record_id.clone()),
3494                    dry_run: Some(receipt.dry_run),
3495                    ..OperationTraceSummary::default()
3496                },
3497                None,
3498                correlation_id,
3499            );
3500
3501            Ok(Response::new(ArchiveReply {
3502                record_id: receipt.record_id,
3503                previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3504                previous_historical_state: historical_state_to_proto(
3505                    receipt.previous_historical_state,
3506                ),
3507                quality_state: quality_state_to_proto(receipt.quality_state),
3508                historical_state: historical_state_to_proto(receipt.historical_state),
3509                changed: receipt.changed,
3510                dry_run: receipt.dry_run,
3511            }))
3512        }
3513        .await;
3514        record_grpc_result(&self.runtime.metrics().grpc_archive, result)
3515    }
3516
3517    async fn suppress(
3518        &self,
3519        request: Request<ProtoSuppressRequest>,
3520    ) -> Result<Response<SuppressReply>, Status> {
3521        self.runtime.metrics().grpc_suppress.record_started();
3522        validate_grpc_auth(
3523            &request,
3524            self.runtime.auth().as_ref(),
3525            AuthPermission::Admin,
3526        )?;
3527        let principal = grpc_principal(&request);
3528        let started_at_unix_ms = now_unix_ms();
3529        let correlation_id = self.runtime.traces().next_id("corr");
3530        let result = async {
3531            let request = request.into_inner();
3532            let tenant_id = request.tenant_id.clone();
3533            let namespace = request.namespace.clone();
3534            let _permit = self
3535                .runtime
3536                .admission()
3537                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3538                .await
3539                .map_err(|error| {
3540                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3541                })?;
3542            let receipt = self
3543                .store
3544                .suppress(SuppressRequest {
3545                    tenant_id,
3546                    namespace: namespace.clone(),
3547                    record_id: request.record_id,
3548                    dry_run: request.dry_run,
3549                    audit_reason: request.audit_reason,
3550                })
3551                .await
3552                .map_err(map_store_error)?;
3553            record_trace(
3554                &self.runtime,
3555                TraceOperationKind::Suppress,
3556                "grpc",
3557                Some(request.tenant_id),
3558                Some(namespace),
3559                principal,
3560                started_at_unix_ms,
3561                TraceStatus::Ok,
3562                None,
3563                OperationTraceSummary {
3564                    record_id: Some(receipt.record_id.clone()),
3565                    dry_run: Some(receipt.dry_run),
3566                    ..OperationTraceSummary::default()
3567                },
3568                None,
3569                correlation_id,
3570            );
3571
3572            Ok(Response::new(SuppressReply {
3573                record_id: receipt.record_id,
3574                previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3575                previous_historical_state: historical_state_to_proto(
3576                    receipt.previous_historical_state,
3577                ),
3578                quality_state: quality_state_to_proto(receipt.quality_state),
3579                historical_state: historical_state_to_proto(receipt.historical_state),
3580                changed: receipt.changed,
3581                dry_run: receipt.dry_run,
3582            }))
3583        }
3584        .await;
3585        record_grpc_result(&self.runtime.metrics().grpc_suppress, result)
3586    }
3587
3588    async fn recover(
3589        &self,
3590        request: Request<ProtoRecoverRequest>,
3591    ) -> Result<Response<RecoverReply>, Status> {
3592        self.runtime.metrics().grpc_recover.record_started();
3593        validate_grpc_auth(
3594            &request,
3595            self.runtime.auth().as_ref(),
3596            AuthPermission::Admin,
3597        )?;
3598        let principal = grpc_principal(&request);
3599        let started_at_unix_ms = now_unix_ms();
3600        let correlation_id = self.runtime.traces().next_id("corr");
3601        let result = async {
3602            let request = request.into_inner();
3603            let tenant_id = request.tenant_id.clone();
3604            let namespace = request.namespace.clone();
3605            let _permit = self
3606                .runtime
3607                .admission()
3608                .acquire(AdmissionClass::Admin, Some(&tenant_id))
3609                .await
3610                .map_err(|error| {
3611                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3612                })?;
3613            let receipt = self
3614                .store
3615                .recover(RecoverRequest {
3616                    tenant_id,
3617                    namespace: namespace.clone(),
3618                    record_id: request.record_id,
3619                    dry_run: request.dry_run,
3620                    audit_reason: request.audit_reason,
3621                    quality_state: quality_state_from_proto(&request.quality_state)?,
3622                    historical_state: request
3623                        .historical_state
3624                        .as_deref()
3625                        .map(historical_state_from_proto)
3626                        .transpose()?,
3627                })
3628                .await
3629                .map_err(map_store_error)?;
3630            record_trace(
3631                &self.runtime,
3632                TraceOperationKind::Recover,
3633                "grpc",
3634                Some(request.tenant_id),
3635                Some(namespace),
3636                principal,
3637                started_at_unix_ms,
3638                TraceStatus::Ok,
3639                None,
3640                OperationTraceSummary {
3641                    record_id: Some(receipt.record_id.clone()),
3642                    dry_run: Some(receipt.dry_run),
3643                    ..OperationTraceSummary::default()
3644                },
3645                None,
3646                correlation_id,
3647            );
3648
3649            Ok(Response::new(RecoverReply {
3650                record_id: receipt.record_id,
3651                previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3652                previous_historical_state: historical_state_to_proto(
3653                    receipt.previous_historical_state,
3654                ),
3655                quality_state: quality_state_to_proto(receipt.quality_state),
3656                historical_state: historical_state_to_proto(receipt.historical_state),
3657                changed: receipt.changed,
3658                dry_run: receipt.dry_run,
3659            }))
3660        }
3661        .await;
3662        record_grpc_result(&self.runtime.metrics().grpc_recover, result)
3663    }
3664
3665    async fn stats(
3666        &self,
3667        request: Request<ProtoStoreStatsRequest>,
3668    ) -> Result<Response<ProtoStoreStatsReply>, Status> {
3669        self.runtime.metrics().grpc_stats.record_started();
3670        validate_grpc_auth(
3671            &request,
3672            self.runtime.auth().as_ref(),
3673            AuthPermission::Admin,
3674        )?;
3675        let principal = grpc_principal(&request);
3676        let started_at_unix_ms = now_unix_ms();
3677        let correlation_id = self.runtime.traces().next_id("corr");
3678        let result = async {
3679            let request = request.into_inner();
3680            let tenant_id = request.tenant_id.clone();
3681            let namespace = request.namespace.clone();
3682            let _permit = self
3683                .runtime
3684                .admission()
3685                .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3686                .await
3687                .map_err(|error| {
3688                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3689                })?;
3690            let report = self
3691                .store
3692                .stats(StoreStatsRequest {
3693                    tenant_id: tenant_id.clone(),
3694                    namespace: namespace.clone(),
3695                })
3696                .await
3697                .map_err(map_store_error)?;
3698            record_trace(
3699                &self.runtime,
3700                TraceOperationKind::Stats,
3701                "grpc",
3702                tenant_id,
3703                namespace,
3704                principal,
3705                started_at_unix_ms,
3706                TraceStatus::Ok,
3707                None,
3708                OperationTraceSummary::default(),
3709                None,
3710                correlation_id,
3711            );
3712
3713            Ok(Response::new(ProtoStoreStatsReply {
3714                generated_at_unix_ms: report.generated_at_unix_ms,
3715                total_records: report.total_records,
3716                storage_bytes: report.storage_bytes,
3717                namespaces: report
3718                    .namespaces
3719                    .into_iter()
3720                    .map(namespace_stats_to_proto)
3721                    .collect(),
3722                maintenance: Some(maintenance_stats_to_proto(report.maintenance)),
3723                engine: Some(engine_tuning_info_to_proto(report.engine)),
3724            }))
3725        }
3726        .await;
3727        record_grpc_result(&self.runtime.metrics().grpc_stats, result)
3728    }
3729
3730    async fn integrity_check(
3731        &self,
3732        request: Request<ProtoIntegrityCheckRequest>,
3733    ) -> Result<Response<ProtoIntegrityCheckReply>, Status> {
3734        self.runtime.metrics().grpc_integrity.record_started();
3735        validate_grpc_auth(
3736            &request,
3737            self.runtime.auth().as_ref(),
3738            AuthPermission::Admin,
3739        )?;
3740        let principal = grpc_principal(&request);
3741        let started_at_unix_ms = now_unix_ms();
3742        let correlation_id = self.runtime.traces().next_id("corr");
3743        let result = async {
3744            let request = request.into_inner();
3745            let tenant_id = request.tenant_id.clone();
3746            let namespace = request.namespace.clone();
3747            let _permit = self
3748                .runtime
3749                .admission()
3750                .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3751                .await
3752                .map_err(|error| {
3753                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3754                })?;
3755            let report = self
3756                .store
3757                .integrity_check(IntegrityCheckRequest {
3758                    tenant_id: tenant_id.clone(),
3759                    namespace: namespace.clone(),
3760                })
3761                .await
3762                .map_err(map_store_error)?;
3763            record_trace(
3764                &self.runtime,
3765                TraceOperationKind::IntegrityCheck,
3766                "grpc",
3767                tenant_id,
3768                namespace,
3769                principal,
3770                started_at_unix_ms,
3771                TraceStatus::Ok,
3772                None,
3773                OperationTraceSummary::default(),
3774                None,
3775                correlation_id,
3776            );
3777
3778            Ok(Response::new(ProtoIntegrityCheckReply {
3779                generated_at_unix_ms: report.generated_at_unix_ms,
3780                healthy: report.healthy,
3781                scanned_records: report.scanned_records,
3782                scanned_idempotency_keys: report.scanned_idempotency_keys,
3783                stale_idempotency_keys: report.stale_idempotency_keys,
3784                missing_idempotency_keys: report.missing_idempotency_keys,
3785                duplicate_active_records: report.duplicate_active_records,
3786            }))
3787        }
3788        .await;
3789        record_grpc_result(&self.runtime.metrics().grpc_integrity, result)
3790    }
3791
3792    async fn repair(
3793        &self,
3794        request: Request<ProtoRepairRequest>,
3795    ) -> Result<Response<ProtoRepairReply>, Status> {
3796        self.runtime.metrics().grpc_repair.record_started();
3797        validate_grpc_auth(
3798            &request,
3799            self.runtime.auth().as_ref(),
3800            AuthPermission::Admin,
3801        )?;
3802        let principal = grpc_principal(&request);
3803        let started_at_unix_ms = now_unix_ms();
3804        let correlation_id = self.runtime.traces().next_id("corr");
3805        let result = async {
3806            let request = request.into_inner();
3807            let tenant_id = request.tenant_id.clone();
3808            let namespace = request.namespace.clone();
3809            let _permit = self
3810                .runtime
3811                .admission()
3812                .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3813                .await
3814                .map_err(|error| {
3815                    map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3816                })?;
3817            let report = self
3818                .store
3819                .repair(RepairRequest {
3820                    tenant_id: tenant_id.clone(),
3821                    namespace: namespace.clone(),
3822                    dry_run: request.dry_run,
3823                    reason: request.reason,
3824                    remove_stale_idempotency_keys: request.remove_stale_idempotency_keys,
3825                    rebuild_missing_idempotency_keys: request.rebuild_missing_idempotency_keys,
3826                })
3827                .await
3828                .map_err(map_store_error)?;
3829            record_trace(
3830                &self.runtime,
3831                TraceOperationKind::Repair,
3832                "grpc",
3833                tenant_id,
3834                namespace,
3835                principal,
3836                started_at_unix_ms,
3837                TraceStatus::Ok,
3838                None,
3839                OperationTraceSummary {
3840                    dry_run: Some(request.dry_run),
3841                    ..OperationTraceSummary::default()
3842                },
3843                None,
3844                correlation_id,
3845            );
3846
3847            Ok(Response::new(ProtoRepairReply {
3848                dry_run: report.dry_run,
3849                scanned_records: report.scanned_records,
3850                scanned_idempotency_keys: report.scanned_idempotency_keys,
3851                removed_stale_idempotency_keys: report.removed_stale_idempotency_keys,
3852                rebuilt_missing_idempotency_keys: report.rebuilt_missing_idempotency_keys,
3853                healthy_after: report.healthy_after,
3854            }))
3855        }
3856        .await;
3857        record_grpc_result(&self.runtime.metrics().grpc_repair, result)
3858    }
3859
3860    async fn list_traces(
3861        &self,
3862        request: Request<ProtoListTracesRequest>,
3863    ) -> Result<Response<ProtoListTracesReply>, Status> {
3864        validate_grpc_auth(
3865            &request,
3866            self.runtime.auth().as_ref(),
3867            AuthPermission::Admin,
3868        )?;
3869        let request = request.into_inner();
3870        self.runtime
3871            .metrics()
3872            .trace_reads
3873            .fetch_add(1, Ordering::Relaxed);
3874        let traces = self.runtime.traces().list(&TraceListRequest {
3875            tenant_id: request.tenant_id,
3876            namespace: request.namespace,
3877            operation: ProtoTraceOperationKind::try_from(request.operation)
3878                .ok()
3879                .and_then(trace_operation_kind_from_proto),
3880            status: ProtoTraceStatus::try_from(request.status)
3881                .ok()
3882                .and_then(trace_status_from_proto),
3883            before_started_at_unix_ms: request.before_started_at_unix_ms,
3884            limit: request.limit.map(|value| value as usize),
3885        });
3886        Ok(Response::new(ProtoListTracesReply {
3887            traces: traces.into_iter().map(operation_trace_to_proto).collect(),
3888        }))
3889    }
3890
3891    async fn get_trace(
3892        &self,
3893        request: Request<ProtoGetTraceRequest>,
3894    ) -> Result<Response<ProtoOperationTrace>, Status> {
3895        validate_grpc_auth(
3896            &request,
3897            self.runtime.auth().as_ref(),
3898            AuthPermission::Admin,
3899        )?;
3900        let trace_id = request.into_inner().trace_id;
3901        self.runtime
3902            .metrics()
3903            .trace_reads
3904            .fetch_add(1, Ordering::Relaxed);
3905        let trace = self
3906            .runtime
3907            .traces()
3908            .get(&trace_id)
3909            .ok_or_else(|| Status::not_found(format!("trace {trace_id} not found")))?;
3910        Ok(Response::new(operation_trace_to_proto(trace)))
3911    }
3912
3913    async fn export(
3914        &self,
3915        request: Request<ProtoExportRequest>,
3916    ) -> Result<Response<ProtoExportReply>, Status> {
3917        validate_grpc_auth(
3918            &request,
3919            self.runtime.auth().as_ref(),
3920            AuthPermission::Admin,
3921        )?;
3922        let principal = grpc_principal(&request);
3923        let started_at_unix_ms = now_unix_ms();
3924        let correlation_id = self.runtime.traces().next_id("corr");
3925        let request = request.into_inner();
3926        let export_request = ExportRequest {
3927            tenant_id: request.tenant_id,
3928            namespace: request.namespace,
3929            include_archived: request.include_archived,
3930        };
3931        let _permit = self
3932            .runtime
3933            .admission()
3934            .acquire(AdmissionClass::Admin, export_request.tenant_id.as_deref())
3935            .await
3936            .map_err(|error| map_grpc_admission_error(self.runtime.metrics().as_ref(), error))?;
3937        let package = self
3938            .store
3939            .export(export_request.clone())
3940            .await
3941            .map_err(map_store_error)?;
3942        record_trace(
3943            &self.runtime,
3944            TraceOperationKind::Export,
3945            "grpc",
3946            export_request.tenant_id,
3947            export_request.namespace,
3948            principal,
3949            started_at_unix_ms,
3950            TraceStatus::Ok,
3951            None,
3952            OperationTraceSummary::default(),
3953            None,
3954            correlation_id,
3955        );
3956        Ok(Response::new(portable_package_to_proto(package)))
3957    }
3958
3959    async fn import(
3960        &self,
3961        request: Request<ProtoImportRequest>,
3962    ) -> Result<Response<ProtoImportReply>, Status> {
3963        validate_grpc_auth(
3964            &request,
3965            self.runtime.auth().as_ref(),
3966            AuthPermission::Admin,
3967        )?;
3968        let principal = grpc_principal(&request);
3969        let started_at_unix_ms = now_unix_ms();
3970        let correlation_id = self.runtime.traces().next_id("corr");
3971        let request = request.into_inner();
3972        let import_request = ImportRequest {
3973            package: portable_package_from_proto(
3974                request
3975                    .package
3976                    .ok_or_else(|| invalid_argument("portable package is required"))?,
3977            )?,
3978            mode: import_mode_from_proto(request.mode)?,
3979            dry_run: request.dry_run,
3980        };
3981        let tenant_id = import_request
3982            .package
3983            .records
3984            .first()
3985            .map(|entry| entry.record.scope.tenant_id.clone());
3986        let namespace = import_request
3987            .package
3988            .records
3989            .first()
3990            .map(|entry| entry.record.scope.namespace.clone());
3991        let _permit = self
3992            .runtime
3993            .admission()
3994            .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3995            .await
3996            .map_err(|error| map_grpc_admission_error(self.runtime.metrics().as_ref(), error))?;
3997        let report = self
3998            .store
3999            .import(import_request.clone())
4000            .await
4001            .map_err(map_store_error)?;
4002        record_trace(
4003            &self.runtime,
4004            TraceOperationKind::Import,
4005            "grpc",
4006            tenant_id,
4007            namespace,
4008            principal,
4009            started_at_unix_ms,
4010            if report.failed_records.is_empty() {
4011                TraceStatus::Ok
4012            } else {
4013                TraceStatus::Error
4014            },
4015            (!report.failed_records.is_empty())
4016                .then(|| format!("{} import validation failures", report.failed_records.len())),
4017            OperationTraceSummary {
4018                request_count: Some(import_request.package.records.len() as u32),
4019                dry_run: Some(import_request.dry_run),
4020                ..OperationTraceSummary::default()
4021            },
4022            None,
4023            correlation_id,
4024        );
4025        Ok(Response::new(ProtoImportReply {
4026            mode: request.mode,
4027            dry_run: report.dry_run,
4028            applied: report.applied,
4029            compatible_package: report.compatible_package,
4030            package_version: report.package_version,
4031            validated_records: report.validated_records,
4032            imported_records: report.imported_records,
4033            skipped_records: report.skipped_records,
4034            replaced_existing: report.replaced_existing,
4035            snapshot_id: report.snapshot_id,
4036            failed_records: report
4037                .failed_records
4038                .into_iter()
4039                .map(|failure| mnemara_protocol::v1::ImportFailure {
4040                    record_id: failure.record_id,
4041                    reason: failure.reason,
4042                })
4043                .collect(),
4044        }))
4045    }
4046}