1#![forbid(unsafe_code)]
2
3mod admission;
4mod observability;
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10use admission::{
11 AdmissionClass, AdmissionConfig, AdmissionController, AdmissionError, RuntimeAdmissionStatus,
12};
13use axum::extract::{Path, Query, Request as AxumRequest};
14use axum::http::HeaderMap;
15use axum::http::StatusCode;
16use axum::middleware::{self, Next};
17use axum::response::IntoResponse;
18use axum::routing::{get, post};
19use axum::{Json, Router};
20use mnemara_core::{
21 AffectiveAnnotation, AffectiveAnnotationProvenance, ArchiveReceipt, ArchiveRequest,
22 ArtifactPointer, BatchUpsertRequest, CompactionReport, CompactionRequest, DeleteReceipt,
23 DeleteRequest, EPISODE_SCHEMA_VERSION, EmbeddingProviderKind, EngineTuningInfo, EpisodeContext,
24 EpisodeContinuityState, EpisodeSalience, ExportRequest, ImportMode, ImportReport,
25 ImportRequest, IntegrityCheckReport, IntegrityCheckRequest, LineageLink, LineageRelationKind,
26 MaintenanceStats, MemoryHistoricalState, MemoryQualityState, MemoryRecord, MemoryRecordKind,
27 MemoryScope, MemoryStore, MemoryTrustLevel, NamespaceStats, OperationTrace,
28 OperationTraceSummary, PortableRecord, PortableStorePackage, RecallCandidateSource,
29 RecallExplanation, RecallFilters, RecallHistoricalMode, RecallPlannerStage,
30 RecallPlanningProfile, RecallPlanningTrace, RecallPolicyProfile, RecallQuery, RecallResult,
31 RecallScorerKind, RecallScoringProfile, RecallTemporalOrder, RecallTraceCandidate,
32 RecoverReceipt, RecoverRequest, RepairReport, RepairRequest, SnapshotManifest,
33 StoreStatsReport, StoreStatsRequest, SuppressReceipt, SuppressRequest, TraceListRequest,
34 TraceOperationKind, TraceStatus, UpsertReceipt, UpsertRequest,
35};
36use mnemara_protocol::v1::memory_service_server::{MemoryService, MemoryServiceServer};
37use mnemara_protocol::v1::{
38 AffectiveAnnotation as ProtoAffectiveAnnotation, ArchiveReply,
39 ArchiveRequest as ProtoArchiveRequest, ArtifactPointer as ProtoArtifactPointer,
40 BatchUpsertMemoryRecordsReply, BatchUpsertMemoryRecordsRequest, CompactReply,
41 CompactRequest as ProtoCompactRequest, DeleteReply, DeleteRequest as ProtoDeleteRequest,
42 EmbeddingProviderKind as ProtoEmbeddingProviderKind, EngineTuningInfo as ProtoEngineTuningInfo,
43 EpisodeContext as ProtoEpisodeContext, EpisodeSalience as ProtoEpisodeSalience,
44 ExportReply as ProtoExportReply, ExportRequest as ProtoExportRequest,
45 GetTraceRequest as ProtoGetTraceRequest, ImportMode as ProtoImportMode,
46 ImportReply as ProtoImportReply, ImportRequest as ProtoImportRequest,
47 IntegrityCheckReply as ProtoIntegrityCheckReply,
48 IntegrityCheckRequest as ProtoIntegrityCheckRequest, LineageLink as ProtoLineageLink,
49 ListTracesReply as ProtoListTracesReply, ListTracesRequest as ProtoListTracesRequest,
50 MaintenanceStats as ProtoMaintenanceStats, MemoryRecord as ProtoMemoryRecord,
51 MemoryScope as ProtoMemoryScope, NamespaceStats as ProtoNamespaceStats,
52 OperationTrace as ProtoOperationTrace, OperationTraceSummary as ProtoOperationTraceSummary,
53 PortableRecord as ProtoPortableRecord, RecallExplanation as ProtoRecallExplanation,
54 RecallFilters as ProtoRecallFilters, RecallHit as ProtoRecallHit,
55 RecallPlanningTrace as ProtoRecallPlanningTrace,
56 RecallPolicyProfile as ProtoRecallPolicyProfile, RecallReply,
57 RecallRequest as ProtoRecallRequest, RecallScoreBreakdown as ProtoRecallScoreBreakdown,
58 RecallScorerKind as ProtoRecallScorerKind, RecallScoringProfile as ProtoRecallScoringProfile,
59 RecallTraceCandidate as ProtoRecallTraceCandidate, RecoverReply,
60 RecoverRequest as ProtoRecoverRequest, RepairReply as ProtoRepairReply,
61 RepairRequest as ProtoRepairRequest, SnapshotReply, SnapshotRequest,
62 StoreStatsReply as ProtoStoreStatsReply, StoreStatsRequest as ProtoStoreStatsRequest,
63 SuppressReply, SuppressRequest as ProtoSuppressRequest,
64 TraceOperationKind as ProtoTraceOperationKind, TraceStatus as ProtoTraceStatus,
65 UpsertMemoryRecordReply, UpsertMemoryRecordRequest as ProtoUpsertMemoryRecordRequest,
66};
67use observability::{TraceRegistry, TraceRegistrySnapshot, now_unix_ms};
68use tonic::{Request, Response, Status};
69
70#[derive(Debug, Clone, serde::Serialize)]
71struct HealthStatus {
72 status: &'static str,
73}
74
75#[derive(Debug, Clone, serde::Serialize)]
76struct ReadyStatus {
77 status: &'static str,
78 record_count: u64,
79 namespaces: Vec<String>,
80}
81
82#[derive(Debug, Clone, serde::Serialize)]
83struct ServerRuntimeStatus {
84 backend: String,
85 admission: RuntimeAdmissionStatus,
86 traces: TraceRegistrySnapshot,
87}
88
89#[derive(Debug, Clone, serde::Serialize)]
90struct HttpErrorBody {
91 error: String,
92}
93
94#[derive(Debug, Default)]
95struct MethodMetrics {
96 started: AtomicU64,
97 ok: AtomicU64,
98 invalid_argument: AtomicU64,
99 conflict: AtomicU64,
100 unimplemented: AtomicU64,
101 internal: AtomicU64,
102}
103
104impl MethodMetrics {
105 fn record_started(&self) {
106 self.started.fetch_add(1, Ordering::Relaxed);
107 }
108
109 fn record_status(&self, status: &Status) {
110 match status.code() {
111 tonic::Code::Ok => {
112 self.ok.fetch_add(1, Ordering::Relaxed);
113 }
114 tonic::Code::InvalidArgument => {
115 self.invalid_argument.fetch_add(1, Ordering::Relaxed);
116 }
117 tonic::Code::AlreadyExists => {
118 self.conflict.fetch_add(1, Ordering::Relaxed);
119 }
120 tonic::Code::Unimplemented => {
121 self.unimplemented.fetch_add(1, Ordering::Relaxed);
122 }
123 _ => {
124 self.internal.fetch_add(1, Ordering::Relaxed);
125 }
126 }
127 }
128}
129
130#[derive(Debug, Default)]
131pub struct ServerMetrics {
132 grpc_upsert: MethodMetrics,
133 grpc_batch_upsert: MethodMetrics,
134 grpc_recall: MethodMetrics,
135 grpc_compact: MethodMetrics,
136 grpc_snapshot: MethodMetrics,
137 grpc_delete: MethodMetrics,
138 grpc_archive: MethodMetrics,
139 grpc_suppress: MethodMetrics,
140 grpc_recover: MethodMetrics,
141 grpc_stats: MethodMetrics,
142 grpc_integrity: MethodMetrics,
143 grpc_repair: MethodMetrics,
144 http_healthz: AtomicU64,
145 http_readyz: AtomicU64,
146 http_snapshot: AtomicU64,
147 http_compact: AtomicU64,
148 http_delete: AtomicU64,
149 http_archive: AtomicU64,
150 http_suppress: AtomicU64,
151 http_recover: AtomicU64,
152 http_metrics: AtomicU64,
153 http_traces: AtomicU64,
154 http_runtime: AtomicU64,
155 http_export: AtomicU64,
156 http_import: AtomicU64,
157 http_rejected_body_too_large: AtomicU64,
158 admission_rejected: AtomicU64,
159 admission_timed_out: AtomicU64,
160 trace_records: AtomicU64,
161 trace_evictions: AtomicU64,
162 trace_reads: AtomicU64,
163}
164
165impl ServerMetrics {
166 pub fn render(&self) -> String {
167 let mut output = String::new();
168 append_counter(
169 &mut output,
170 "mnemara_grpc_upsert_requests_started_total",
171 self.grpc_upsert.started.load(Ordering::Relaxed),
172 );
173 append_counter(
174 &mut output,
175 "mnemara_grpc_upsert_requests_ok_total",
176 self.grpc_upsert.ok.load(Ordering::Relaxed),
177 );
178 append_counter(
179 &mut output,
180 "mnemara_grpc_upsert_requests_invalid_argument_total",
181 self.grpc_upsert.invalid_argument.load(Ordering::Relaxed),
182 );
183 append_counter(
184 &mut output,
185 "mnemara_grpc_upsert_requests_conflict_total",
186 self.grpc_upsert.conflict.load(Ordering::Relaxed),
187 );
188 append_counter(
189 &mut output,
190 "mnemara_grpc_upsert_requests_unimplemented_total",
191 self.grpc_upsert.unimplemented.load(Ordering::Relaxed),
192 );
193 append_counter(
194 &mut output,
195 "mnemara_grpc_upsert_requests_internal_total",
196 self.grpc_upsert.internal.load(Ordering::Relaxed),
197 );
198 append_counter(
199 &mut output,
200 "mnemara_grpc_batch_upsert_requests_started_total",
201 self.grpc_batch_upsert.started.load(Ordering::Relaxed),
202 );
203 append_counter(
204 &mut output,
205 "mnemara_grpc_batch_upsert_requests_ok_total",
206 self.grpc_batch_upsert.ok.load(Ordering::Relaxed),
207 );
208 append_counter(
209 &mut output,
210 "mnemara_grpc_batch_upsert_requests_invalid_argument_total",
211 self.grpc_batch_upsert
212 .invalid_argument
213 .load(Ordering::Relaxed),
214 );
215 append_counter(
216 &mut output,
217 "mnemara_grpc_recall_requests_started_total",
218 self.grpc_recall.started.load(Ordering::Relaxed),
219 );
220 append_counter(
221 &mut output,
222 "mnemara_grpc_recall_requests_ok_total",
223 self.grpc_recall.ok.load(Ordering::Relaxed),
224 );
225 append_counter(
226 &mut output,
227 "mnemara_grpc_recall_requests_invalid_argument_total",
228 self.grpc_recall.invalid_argument.load(Ordering::Relaxed),
229 );
230 append_counter(
231 &mut output,
232 "mnemara_grpc_compact_requests_started_total",
233 self.grpc_compact.started.load(Ordering::Relaxed),
234 );
235 append_counter(
236 &mut output,
237 "mnemara_grpc_compact_requests_ok_total",
238 self.grpc_compact.ok.load(Ordering::Relaxed),
239 );
240 append_counter(
241 &mut output,
242 "mnemara_grpc_snapshot_requests_started_total",
243 self.grpc_snapshot.started.load(Ordering::Relaxed),
244 );
245 append_counter(
246 &mut output,
247 "mnemara_grpc_snapshot_requests_ok_total",
248 self.grpc_snapshot.ok.load(Ordering::Relaxed),
249 );
250 append_counter(
251 &mut output,
252 "mnemara_grpc_delete_requests_started_total",
253 self.grpc_delete.started.load(Ordering::Relaxed),
254 );
255 append_counter(
256 &mut output,
257 "mnemara_grpc_delete_requests_ok_total",
258 self.grpc_delete.ok.load(Ordering::Relaxed),
259 );
260 append_counter(
261 &mut output,
262 "mnemara_grpc_archive_requests_started_total",
263 self.grpc_archive.started.load(Ordering::Relaxed),
264 );
265 append_counter(
266 &mut output,
267 "mnemara_grpc_archive_requests_ok_total",
268 self.grpc_archive.ok.load(Ordering::Relaxed),
269 );
270 append_counter(
271 &mut output,
272 "mnemara_grpc_suppress_requests_started_total",
273 self.grpc_suppress.started.load(Ordering::Relaxed),
274 );
275 append_counter(
276 &mut output,
277 "mnemara_grpc_suppress_requests_ok_total",
278 self.grpc_suppress.ok.load(Ordering::Relaxed),
279 );
280 append_counter(
281 &mut output,
282 "mnemara_grpc_recover_requests_started_total",
283 self.grpc_recover.started.load(Ordering::Relaxed),
284 );
285 append_counter(
286 &mut output,
287 "mnemara_grpc_recover_requests_ok_total",
288 self.grpc_recover.ok.load(Ordering::Relaxed),
289 );
290 append_counter(
291 &mut output,
292 "mnemara_http_healthz_requests_total",
293 self.http_healthz.load(Ordering::Relaxed),
294 );
295 append_counter(
296 &mut output,
297 "mnemara_http_readyz_requests_total",
298 self.http_readyz.load(Ordering::Relaxed),
299 );
300 append_counter(
301 &mut output,
302 "mnemara_http_snapshot_requests_total",
303 self.http_snapshot.load(Ordering::Relaxed),
304 );
305 append_counter(
306 &mut output,
307 "mnemara_http_compact_requests_total",
308 self.http_compact.load(Ordering::Relaxed),
309 );
310 append_counter(
311 &mut output,
312 "mnemara_http_delete_requests_total",
313 self.http_delete.load(Ordering::Relaxed),
314 );
315 append_counter(
316 &mut output,
317 "mnemara_http_archive_requests_total",
318 self.http_archive.load(Ordering::Relaxed),
319 );
320 append_counter(
321 &mut output,
322 "mnemara_http_suppress_requests_total",
323 self.http_suppress.load(Ordering::Relaxed),
324 );
325 append_counter(
326 &mut output,
327 "mnemara_http_recover_requests_total",
328 self.http_recover.load(Ordering::Relaxed),
329 );
330 append_counter(
331 &mut output,
332 "mnemara_http_metrics_requests_total",
333 self.http_metrics.load(Ordering::Relaxed),
334 );
335 append_counter(
336 &mut output,
337 "mnemara_http_traces_requests_total",
338 self.http_traces.load(Ordering::Relaxed),
339 );
340 append_counter(
341 &mut output,
342 "mnemara_http_runtime_requests_total",
343 self.http_runtime.load(Ordering::Relaxed),
344 );
345 append_counter(
346 &mut output,
347 "mnemara_http_export_requests_total",
348 self.http_export.load(Ordering::Relaxed),
349 );
350 append_counter(
351 &mut output,
352 "mnemara_http_import_requests_total",
353 self.http_import.load(Ordering::Relaxed),
354 );
355 append_counter(
356 &mut output,
357 "mnemara_http_rejected_body_too_large_total",
358 self.http_rejected_body_too_large.load(Ordering::Relaxed),
359 );
360 append_counter(
361 &mut output,
362 "mnemara_admission_rejected_total",
363 self.admission_rejected.load(Ordering::Relaxed),
364 );
365 append_counter(
366 &mut output,
367 "mnemara_admission_timed_out_total",
368 self.admission_timed_out.load(Ordering::Relaxed),
369 );
370 append_counter(
371 &mut output,
372 "mnemara_trace_records_total",
373 self.trace_records.load(Ordering::Relaxed),
374 );
375 append_counter(
376 &mut output,
377 "mnemara_trace_evictions_total",
378 self.trace_evictions.load(Ordering::Relaxed),
379 );
380 append_counter(
381 &mut output,
382 "mnemara_trace_reads_total",
383 self.trace_reads.load(Ordering::Relaxed),
384 );
385 output
386 }
387}
388
389fn append_counter(output: &mut String, name: &str, value: u64) {
390 output.push_str("# TYPE ");
391 output.push_str(name);
392 output.push_str(" counter\n");
393 output.push_str(name);
394 output.push(' ');
395 output.push_str(&value.to_string());
396 output.push('\n');
397}
398
399#[derive(Debug, Clone)]
400pub struct ServerLimits {
401 pub max_http_body_bytes: usize,
402 pub max_batch_upsert_requests: usize,
403 pub max_recall_items: usize,
404 pub max_query_text_bytes: usize,
405 pub max_record_content_bytes: usize,
406 pub max_labels_per_scope: usize,
407 pub max_inflight_reads: usize,
408 pub max_inflight_writes: usize,
409 pub max_inflight_admin: usize,
410 pub max_queued_requests: usize,
411 pub max_tenant_inflight: usize,
412 pub queue_wait_timeout_ms: u64,
413 pub trace_retention: usize,
414}
415
416impl Default for ServerLimits {
417 fn default() -> Self {
418 Self {
419 max_http_body_bytes: 64 * 1024,
420 max_batch_upsert_requests: 128,
421 max_recall_items: 64,
422 max_query_text_bytes: 4 * 1024,
423 max_record_content_bytes: 32 * 1024,
424 max_labels_per_scope: 32,
425 max_inflight_reads: 64,
426 max_inflight_writes: 32,
427 max_inflight_admin: 8,
428 max_queued_requests: 256,
429 max_tenant_inflight: 16,
430 queue_wait_timeout_ms: 2_000,
431 trace_retention: 256,
432 }
433 }
434}
435
436#[derive(Debug, Clone, Default)]
437pub struct AuthConfig {
438 pub bearer_token: Option<String>,
439 pub protect_metrics: bool,
440 pub token_policies: Vec<TokenPolicy>,
441}
442
443#[derive(Debug, Clone, Copy, PartialEq, Eq)]
444pub enum AuthPermission {
445 Read,
446 Write,
447 Admin,
448 Metrics,
449}
450
451#[derive(Debug, Clone, PartialEq, Eq)]
452pub struct TokenPolicy {
453 pub token: String,
454 pub permissions: Vec<AuthPermission>,
455}
456
457impl AuthConfig {
458 fn has_authentication(&self) -> bool {
459 self.bearer_token.is_some() || !self.token_policies.is_empty()
460 }
461
462 fn authorize(&self, authorization_header: Option<&str>, permission: AuthPermission) -> bool {
463 if !self.has_authentication() {
464 return true;
465 }
466
467 let Some(token) = bearer_token_from_header(authorization_header) else {
468 return false;
469 };
470
471 if self.bearer_token.as_deref() == Some(token) {
472 return true;
473 }
474
475 self.token_policies
476 .iter()
477 .any(|policy| policy.token == token && policy.permissions.contains(&permission))
478 }
479}
480
481#[derive(Debug, Clone, serde::Deserialize)]
482pub struct DeleteHttpRequest {
483 pub tenant_id: String,
484 pub namespace: String,
485 pub record_id: String,
486 pub hard_delete: bool,
487 pub audit_reason: String,
488}
489
490#[derive(Debug, Clone, serde::Deserialize)]
491pub struct ArchiveHttpRequest {
492 pub tenant_id: String,
493 pub namespace: String,
494 pub record_id: String,
495 pub dry_run: bool,
496 pub audit_reason: String,
497}
498
499#[derive(Debug, Clone, serde::Deserialize)]
500pub struct SuppressHttpRequest {
501 pub tenant_id: String,
502 pub namespace: String,
503 pub record_id: String,
504 pub dry_run: bool,
505 pub audit_reason: String,
506}
507
508#[derive(Debug, Clone, serde::Deserialize)]
509pub struct RecoverHttpRequest {
510 pub tenant_id: String,
511 pub namespace: String,
512 pub record_id: String,
513 pub dry_run: bool,
514 pub audit_reason: String,
515 pub quality_state: MemoryQualityState,
516 pub historical_state: Option<MemoryHistoricalState>,
517}
518
519pub struct GrpcMemoryService<S> {
520 store: Arc<S>,
521 runtime: ServerRuntime,
522}
523
524impl<S> Clone for GrpcMemoryService<S> {
525 fn clone(&self) -> Self {
526 Self {
527 store: Arc::clone(&self.store),
528 runtime: self.runtime.clone(),
529 }
530 }
531}
532
533#[derive(Debug)]
534struct ServerRuntimeInner {
535 backend: Arc<String>,
536 limits: Arc<ServerLimits>,
537 metrics: Arc<ServerMetrics>,
538 auth: Arc<AuthConfig>,
539 traces: Arc<TraceRegistry>,
540 admission: Arc<AdmissionController>,
541}
542
543#[derive(Debug, Clone)]
544pub struct ServerRuntime {
545 inner: Arc<ServerRuntimeInner>,
546}
547
548impl ServerRuntime {
549 pub fn new(
550 backend: impl Into<String>,
551 limits: ServerLimits,
552 metrics: Arc<ServerMetrics>,
553 auth: AuthConfig,
554 ) -> Self {
555 let admission = AdmissionController::new(AdmissionConfig {
556 max_inflight_reads: limits.max_inflight_reads,
557 max_inflight_writes: limits.max_inflight_writes,
558 max_inflight_admin: limits.max_inflight_admin,
559 max_queued_requests: limits.max_queued_requests,
560 max_tenant_inflight: limits.max_tenant_inflight,
561 queue_wait_timeout_ms: limits.queue_wait_timeout_ms,
562 });
563 Self {
564 inner: Arc::new(ServerRuntimeInner {
565 backend: Arc::new(backend.into()),
566 traces: Arc::new(TraceRegistry::new(limits.trace_retention)),
567 admission: Arc::new(admission),
568 auth: Arc::new(auth),
569 metrics,
570 limits: Arc::new(limits),
571 }),
572 }
573 }
574
575 fn limits(&self) -> &Arc<ServerLimits> {
576 &self.inner.limits
577 }
578
579 fn metrics(&self) -> &Arc<ServerMetrics> {
580 &self.inner.metrics
581 }
582
583 fn backend(&self) -> &Arc<String> {
584 &self.inner.backend
585 }
586
587 fn auth(&self) -> &Arc<AuthConfig> {
588 &self.inner.auth
589 }
590
591 fn traces(&self) -> &Arc<TraceRegistry> {
592 &self.inner.traces
593 }
594
595 fn admission(&self) -> &Arc<AdmissionController> {
596 &self.inner.admission
597 }
598}
599
600impl<S: MemoryStore> GrpcMemoryService<S> {
601 pub fn new(store: Arc<S>) -> Self {
602 let backend = store.as_ref().backend_kind().to_string();
603 Self::with_runtime(
604 store,
605 ServerRuntime::new(
606 backend,
607 ServerLimits::default(),
608 Arc::new(ServerMetrics::default()),
609 AuthConfig::default(),
610 ),
611 )
612 }
613
614 pub fn with_limits(store: Arc<S>, limits: ServerLimits) -> Self {
615 let backend = store.as_ref().backend_kind().to_string();
616 Self::with_runtime(
617 store,
618 ServerRuntime::new(
619 backend,
620 limits,
621 Arc::new(ServerMetrics::default()),
622 AuthConfig::default(),
623 ),
624 )
625 }
626
627 pub fn with_observability(
628 store: Arc<S>,
629 limits: ServerLimits,
630 metrics: Arc<ServerMetrics>,
631 ) -> Self {
632 let backend = store.as_ref().backend_kind().to_string();
633 Self::with_runtime(
634 store,
635 ServerRuntime::new(backend, limits, metrics, AuthConfig::default()),
636 )
637 }
638
639 pub fn with_runtime_config(
640 store: Arc<S>,
641 limits: ServerLimits,
642 metrics: Arc<ServerMetrics>,
643 auth: AuthConfig,
644 ) -> Self {
645 let backend = store.as_ref().backend_kind().to_string();
646 Self::with_runtime(store, ServerRuntime::new(backend, limits, metrics, auth))
647 }
648
649 pub fn with_runtime(store: Arc<S>, runtime: ServerRuntime) -> Self {
650 Self { store, runtime }
651 }
652
653 pub fn into_service(self) -> MemoryServiceServer<Self>
654 where
655 Self: MemoryService,
656 {
657 MemoryServiceServer::new(self)
658 }
659}
660
661pub async fn serve<S>(
662 addr: SocketAddr,
663 store: Arc<S>,
664 limits: ServerLimits,
665 auth: AuthConfig,
666) -> Result<(), tonic::transport::Error>
667where
668 S: MemoryStore + 'static,
669{
670 let backend = store.as_ref().backend_kind().to_string();
671 serve_with_runtime(
672 addr,
673 store,
674 ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
675 )
676 .await
677}
678
679pub async fn serve_with_runtime<S>(
680 addr: SocketAddr,
681 store: Arc<S>,
682 runtime: ServerRuntime,
683) -> Result<(), tonic::transport::Error>
684where
685 S: MemoryStore + 'static,
686{
687 tonic::transport::Server::builder()
688 .add_service(GrpcMemoryService::with_runtime(store, runtime).into_service())
689 .serve(addr)
690 .await
691}
692
693pub fn http_app<S>(store: Arc<S>, limits: ServerLimits, auth: AuthConfig) -> Router
694where
695 S: MemoryStore + 'static,
696{
697 let backend = store.as_ref().backend_kind().to_string();
698 http_app_with_runtime(
699 store,
700 ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
701 )
702}
703
704pub fn http_app_with_metrics<S>(
705 store: Arc<S>,
706 limits: ServerLimits,
707 metrics: Arc<ServerMetrics>,
708 auth: AuthConfig,
709) -> Router
710where
711 S: MemoryStore + 'static,
712{
713 let backend = store.as_ref().backend_kind().to_string();
714 http_app_with_runtime(store, ServerRuntime::new(backend, limits, metrics, auth))
715}
716
717pub fn http_app_with_runtime<S>(store: Arc<S>, runtime: ServerRuntime) -> Router
718where
719 S: MemoryStore + 'static,
720{
721 let limits = Arc::clone(runtime.limits());
722 let ready_store = Arc::clone(&store);
723 let upsert_store = Arc::clone(&store);
724 let batch_upsert_store = Arc::clone(&store);
725 let recall_store = Arc::clone(&store);
726 let snapshot_store = Arc::clone(&store);
727 let stats_store = Arc::clone(&store);
728 let integrity_store = Arc::clone(&store);
729 let repair_store = Arc::clone(&store);
730 let compact_store = Arc::clone(&store);
731 let delete_store = Arc::clone(&store);
732 let archive_store = Arc::clone(&store);
733 let suppress_store = Arc::clone(&store);
734 let recover_store = Arc::clone(&store);
735 let export_store = Arc::clone(&store);
736 let import_store = Arc::clone(&store);
737 let ready_runtime = runtime.clone();
738 let upsert_runtime = runtime.clone();
739 let batch_upsert_runtime = runtime.clone();
740 let recall_runtime = runtime.clone();
741 let snapshot_runtime = runtime.clone();
742 let stats_runtime = runtime.clone();
743 let integrity_runtime = runtime.clone();
744 let repair_runtime = runtime.clone();
745 let compact_runtime = runtime.clone();
746 let delete_runtime = runtime.clone();
747 let archive_runtime = runtime.clone();
748 let suppress_runtime = runtime.clone();
749 let recover_runtime = runtime.clone();
750 let traces_runtime = runtime.clone();
751 let trace_runtime = runtime.clone();
752 let export_runtime = runtime.clone();
753 let import_runtime = runtime.clone();
754 let runtime_status_runtime = runtime.clone();
755 let metrics_metrics = Arc::clone(runtime.metrics());
756 let middleware_limits = Arc::clone(&limits);
757 let middleware_metrics = Arc::clone(runtime.metrics());
758 let middleware_auth = Arc::clone(runtime.auth());
759
760 let health_metrics = Arc::clone(runtime.metrics());
761
762 Router::new()
763 .route(
764 "/healthz",
765 get(move || {
766 let metrics = Arc::clone(&health_metrics);
767 async move { healthz(metrics).await }
768 }),
769 )
770 .route(
771 "/readyz",
772 get(move || {
773 let store = Arc::clone(&ready_store);
774 let runtime = ready_runtime.clone();
775 async move { readyz(store, runtime).await }
776 }),
777 )
778 .route(
779 "/memory/upsert",
780 post(
781 move |headers: HeaderMap, Json(request): Json<UpsertRequest>| {
782 let store = Arc::clone(&upsert_store);
783 let runtime = upsert_runtime.clone();
784 async move { upsert_http(store, request, headers, runtime).await }
785 },
786 ),
787 )
788 .route(
789 "/memory/batch-upsert",
790 post(
791 move |headers: HeaderMap, Json(request): Json<BatchUpsertRequest>| {
792 let store = Arc::clone(&batch_upsert_store);
793 let runtime = batch_upsert_runtime.clone();
794 async move { batch_upsert_http(store, request, headers, runtime).await }
795 },
796 ),
797 )
798 .route(
799 "/memory/recall",
800 post(
801 move |headers: HeaderMap, Json(request): Json<RecallQuery>| {
802 let store = Arc::clone(&recall_store);
803 let runtime = recall_runtime.clone();
804 async move { recall_http(store, request, headers, runtime).await }
805 },
806 ),
807 )
808 .route(
809 "/admin/snapshot",
810 get(move |headers: HeaderMap| {
811 let store = Arc::clone(&snapshot_store);
812 let runtime = snapshot_runtime.clone();
813 async move { snapshot_http(store, headers, runtime).await }
814 }),
815 )
816 .route(
817 "/admin/stats",
818 get(
819 move |headers: HeaderMap, Query(request): Query<StoreStatsRequest>| {
820 let store = Arc::clone(&stats_store);
821 let runtime = stats_runtime.clone();
822 async move { stats_http(store, request, headers, runtime).await }
823 },
824 ),
825 )
826 .route(
827 "/admin/integrity",
828 get(
829 move |headers: HeaderMap, Query(request): Query<IntegrityCheckRequest>| {
830 let store = Arc::clone(&integrity_store);
831 let runtime = integrity_runtime.clone();
832 async move { integrity_http(store, request, headers, runtime).await }
833 },
834 ),
835 )
836 .route(
837 "/admin/repair",
838 post(
839 move |headers: HeaderMap, Json(request): Json<RepairRequest>| {
840 let store = Arc::clone(&repair_store);
841 let runtime = repair_runtime.clone();
842 async move { repair_http(store, request, headers, runtime).await }
843 },
844 ),
845 )
846 .route(
847 "/admin/compact",
848 post(
849 move |headers: HeaderMap, Json(request): Json<CompactionRequest>| {
850 let store = Arc::clone(&compact_store);
851 let runtime = compact_runtime.clone();
852 async move { compact_http(store, request, headers, runtime).await }
853 },
854 ),
855 )
856 .route(
857 "/admin/delete",
858 post(
859 move |headers: HeaderMap, Json(request): Json<DeleteHttpRequest>| {
860 let store = Arc::clone(&delete_store);
861 let runtime = delete_runtime.clone();
862 async move { delete_http(store, request, headers, runtime).await }
863 },
864 ),
865 )
866 .route(
867 "/admin/archive",
868 post(
869 move |headers: HeaderMap, Json(request): Json<ArchiveHttpRequest>| {
870 let store = Arc::clone(&archive_store);
871 let runtime = archive_runtime.clone();
872 async move { archive_http(store, request, headers, runtime).await }
873 },
874 ),
875 )
876 .route(
877 "/admin/suppress",
878 post(
879 move |headers: HeaderMap, Json(request): Json<SuppressHttpRequest>| {
880 let store = Arc::clone(&suppress_store);
881 let runtime = suppress_runtime.clone();
882 async move { suppress_http(store, request, headers, runtime).await }
883 },
884 ),
885 )
886 .route(
887 "/admin/recover",
888 post(
889 move |headers: HeaderMap, Json(request): Json<RecoverHttpRequest>| {
890 let store = Arc::clone(&recover_store);
891 let runtime = recover_runtime.clone();
892 async move { recover_http(store, request, headers, runtime).await }
893 },
894 ),
895 )
896 .route(
897 "/admin/traces",
898 get(move |Query(request): Query<TraceListRequest>| {
899 let runtime = traces_runtime.clone();
900 async move { traces_http(request, runtime).await }
901 }),
902 )
903 .route(
904 "/admin/traces/{trace_id}",
905 get(move |Path(trace_id): Path<String>| {
906 let runtime = trace_runtime.clone();
907 async move { trace_http(trace_id, runtime).await }
908 }),
909 )
910 .route(
911 "/admin/runtime",
912 get(move || {
913 let runtime = runtime_status_runtime.clone();
914 async move { runtime_status_http(runtime).await }
915 }),
916 )
917 .route(
918 "/admin/export",
919 post(
920 move |headers: HeaderMap, Json(request): Json<ExportRequest>| {
921 let store = Arc::clone(&export_store);
922 let runtime = export_runtime.clone();
923 async move { export_http(store, request, headers, runtime).await }
924 },
925 ),
926 )
927 .route(
928 "/admin/import",
929 post(
930 move |headers: HeaderMap, Json(request): Json<ImportRequest>| {
931 let store = Arc::clone(&import_store);
932 let runtime = import_runtime.clone();
933 async move { import_http(store, request, headers, runtime).await }
934 },
935 ),
936 )
937 .route(
938 "/metrics",
939 get(move || {
940 let metrics = Arc::clone(&metrics_metrics);
941 async move { metrics_http(metrics).await }
942 }),
943 )
944 .layer(middleware::from_fn(move |request, next| {
945 let limits = Arc::clone(&middleware_limits);
946 let metrics = Arc::clone(&middleware_metrics);
947 let auth = Arc::clone(&middleware_auth);
948 async move { enforce_http_guardrails(request, next, limits, metrics, auth).await }
949 }))
950}
951
952pub async fn serve_http<S>(
953 addr: SocketAddr,
954 store: Arc<S>,
955 limits: ServerLimits,
956 auth: AuthConfig,
957) -> std::io::Result<()>
958where
959 S: MemoryStore + 'static,
960{
961 let backend = store.as_ref().backend_kind().to_string();
962 serve_http_with_runtime(
963 addr,
964 store,
965 ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
966 )
967 .await
968}
969
970pub async fn serve_http_with_runtime<S>(
971 addr: SocketAddr,
972 store: Arc<S>,
973 runtime: ServerRuntime,
974) -> std::io::Result<()>
975where
976 S: MemoryStore + 'static,
977{
978 let listener = tokio::net::TcpListener::bind(addr).await?;
979 axum::serve(listener, http_app_with_runtime(store, runtime)).await
980}
981
982async fn healthz(metrics: Arc<ServerMetrics>) -> Json<HealthStatus> {
983 metrics.http_healthz.fetch_add(1, Ordering::Relaxed);
984 Json(HealthStatus { status: "ok" })
985}
986
987async fn readyz<S>(
988 store: Arc<S>,
989 runtime: ServerRuntime,
990) -> Result<Json<ReadyStatus>, (StatusCode, Json<HttpErrorBody>)>
991where
992 S: MemoryStore + 'static,
993{
994 runtime
995 .metrics()
996 .http_readyz
997 .fetch_add(1, Ordering::Relaxed);
998 let manifest = store.snapshot().await.map_err(map_http_store_error)?;
999 Ok(Json(ReadyStatus {
1000 status: "ready",
1001 record_count: manifest.record_count,
1002 namespaces: manifest.namespaces,
1003 }))
1004}
1005
1006async fn upsert_http<S>(
1007 store: Arc<S>,
1008 request: UpsertRequest,
1009 headers: HeaderMap,
1010 runtime: ServerRuntime,
1011) -> Result<Json<UpsertReceipt>, (StatusCode, Json<HttpErrorBody>)>
1012where
1013 S: MemoryStore + 'static,
1014{
1015 let started_at_unix_ms = now_unix_ms();
1016 let correlation_id = runtime.traces().next_id("corr");
1017 let _permit = runtime
1018 .admission()
1019 .acquire(AdmissionClass::Write, Some(&request.record.scope.tenant_id))
1020 .await
1021 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1022 let receipt = store
1023 .upsert(request.clone())
1024 .await
1025 .map_err(map_http_store_error)?;
1026 record_trace(
1027 &runtime,
1028 TraceOperationKind::Upsert,
1029 "http",
1030 Some(request.record.scope.tenant_id),
1031 Some(request.record.scope.namespace),
1032 http_principal(&headers),
1033 started_at_unix_ms,
1034 TraceStatus::Ok,
1035 None,
1036 OperationTraceSummary {
1037 record_id: Some(request.record.id),
1038 ..OperationTraceSummary::default()
1039 },
1040 None,
1041 correlation_id,
1042 );
1043 Ok(Json(receipt))
1044}
1045
1046async fn batch_upsert_http<S>(
1047 store: Arc<S>,
1048 request: BatchUpsertRequest,
1049 headers: HeaderMap,
1050 runtime: ServerRuntime,
1051) -> Result<Json<Vec<UpsertReceipt>>, (StatusCode, Json<HttpErrorBody>)>
1052where
1053 S: MemoryStore + 'static,
1054{
1055 let started_at_unix_ms = now_unix_ms();
1056 let correlation_id = runtime.traces().next_id("corr");
1057 let tenant_id = request
1058 .requests
1059 .first()
1060 .map(|item| item.record.scope.tenant_id.clone());
1061 let namespace = request
1062 .requests
1063 .first()
1064 .map(|item| item.record.scope.namespace.clone());
1065 let _permit = runtime
1066 .admission()
1067 .acquire(AdmissionClass::Write, tenant_id.as_deref())
1068 .await
1069 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1070 let receipts = store
1071 .batch_upsert(request.clone())
1072 .await
1073 .map_err(map_http_store_error)?;
1074 record_trace(
1075 &runtime,
1076 TraceOperationKind::BatchUpsert,
1077 "http",
1078 tenant_id,
1079 namespace,
1080 http_principal(&headers),
1081 started_at_unix_ms,
1082 TraceStatus::Ok,
1083 None,
1084 OperationTraceSummary {
1085 request_count: Some(request.requests.len() as u32),
1086 ..OperationTraceSummary::default()
1087 },
1088 None,
1089 correlation_id,
1090 );
1091 Ok(Json(receipts))
1092}
1093
1094async fn recall_http<S>(
1095 store: Arc<S>,
1096 request: RecallQuery,
1097 headers: HeaderMap,
1098 runtime: ServerRuntime,
1099) -> Result<Json<RecallResult>, (StatusCode, Json<HttpErrorBody>)>
1100where
1101 S: MemoryStore + 'static,
1102{
1103 let started_at_unix_ms = now_unix_ms();
1104 let correlation_id = runtime.traces().next_id("corr");
1105 let _permit = runtime
1106 .admission()
1107 .acquire(AdmissionClass::Read, Some(&request.scope.tenant_id))
1108 .await
1109 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1110 let mut result = store
1111 .recall(request.clone())
1112 .await
1113 .map_err(map_http_store_error)?;
1114 attach_correlation_id(&mut result, &correlation_id);
1115 record_trace(
1116 &runtime,
1117 TraceOperationKind::Recall,
1118 "http",
1119 Some(request.scope.tenant_id),
1120 Some(request.scope.namespace),
1121 http_principal(&headers),
1122 started_at_unix_ms,
1123 TraceStatus::Ok,
1124 None,
1125 OperationTraceSummary {
1126 query_text: Some(request.query_text),
1127 max_items: Some(request.max_items as u32),
1128 token_budget: request.token_budget.map(|value| value as u32),
1129 ..OperationTraceSummary::default()
1130 },
1131 result.explanation.clone(),
1132 correlation_id,
1133 );
1134 Ok(Json(result))
1135}
1136
1137async fn snapshot_http<S>(
1138 store: Arc<S>,
1139 headers: HeaderMap,
1140 runtime: ServerRuntime,
1141) -> Result<Json<SnapshotManifest>, (StatusCode, Json<HttpErrorBody>)>
1142where
1143 S: MemoryStore + 'static,
1144{
1145 runtime
1146 .metrics()
1147 .http_snapshot
1148 .fetch_add(1, Ordering::Relaxed);
1149 let started_at_unix_ms = now_unix_ms();
1150 let correlation_id = runtime.traces().next_id("corr");
1151 let _permit = runtime
1152 .admission()
1153 .acquire(AdmissionClass::Admin, None)
1154 .await
1155 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1156 let manifest = store.snapshot().await.map_err(map_http_store_error)?;
1157 record_trace(
1158 &runtime,
1159 TraceOperationKind::Snapshot,
1160 "http",
1161 None,
1162 None,
1163 http_principal(&headers),
1164 started_at_unix_ms,
1165 TraceStatus::Ok,
1166 None,
1167 OperationTraceSummary::default(),
1168 None,
1169 correlation_id,
1170 );
1171 Ok(Json(manifest))
1172}
1173
1174async fn stats_http<S>(
1175 store: Arc<S>,
1176 request: StoreStatsRequest,
1177 headers: HeaderMap,
1178 runtime: ServerRuntime,
1179) -> Result<Json<StoreStatsReport>, (StatusCode, Json<HttpErrorBody>)>
1180where
1181 S: MemoryStore + 'static,
1182{
1183 let started_at_unix_ms = now_unix_ms();
1184 let correlation_id = runtime.traces().next_id("corr");
1185 let _permit = runtime
1186 .admission()
1187 .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1188 .await
1189 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1190 let report = store
1191 .stats(request.clone())
1192 .await
1193 .map_err(map_http_store_error)?;
1194 record_trace(
1195 &runtime,
1196 TraceOperationKind::Stats,
1197 "http",
1198 request.tenant_id,
1199 request.namespace,
1200 http_principal(&headers),
1201 started_at_unix_ms,
1202 TraceStatus::Ok,
1203 None,
1204 OperationTraceSummary::default(),
1205 None,
1206 correlation_id,
1207 );
1208 Ok(Json(report))
1209}
1210
1211async fn integrity_http<S>(
1212 store: Arc<S>,
1213 request: IntegrityCheckRequest,
1214 headers: HeaderMap,
1215 runtime: ServerRuntime,
1216) -> Result<Json<IntegrityCheckReport>, (StatusCode, Json<HttpErrorBody>)>
1217where
1218 S: MemoryStore + 'static,
1219{
1220 let started_at_unix_ms = now_unix_ms();
1221 let correlation_id = runtime.traces().next_id("corr");
1222 let _permit = runtime
1223 .admission()
1224 .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1225 .await
1226 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1227 let report = store
1228 .integrity_check(request.clone())
1229 .await
1230 .map_err(map_http_store_error)?;
1231 record_trace(
1232 &runtime,
1233 TraceOperationKind::IntegrityCheck,
1234 "http",
1235 request.tenant_id,
1236 request.namespace,
1237 http_principal(&headers),
1238 started_at_unix_ms,
1239 TraceStatus::Ok,
1240 None,
1241 OperationTraceSummary::default(),
1242 None,
1243 correlation_id,
1244 );
1245 Ok(Json(report))
1246}
1247
1248async fn repair_http<S>(
1249 store: Arc<S>,
1250 request: RepairRequest,
1251 headers: HeaderMap,
1252 runtime: ServerRuntime,
1253) -> Result<Json<RepairReport>, (StatusCode, Json<HttpErrorBody>)>
1254where
1255 S: MemoryStore + 'static,
1256{
1257 let started_at_unix_ms = now_unix_ms();
1258 let correlation_id = runtime.traces().next_id("corr");
1259 let _permit = runtime
1260 .admission()
1261 .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1262 .await
1263 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1264 let report = store
1265 .repair(request.clone())
1266 .await
1267 .map_err(map_http_store_error)?;
1268 record_trace(
1269 &runtime,
1270 TraceOperationKind::Repair,
1271 "http",
1272 request.tenant_id,
1273 request.namespace,
1274 http_principal(&headers),
1275 started_at_unix_ms,
1276 TraceStatus::Ok,
1277 None,
1278 OperationTraceSummary {
1279 dry_run: Some(request.dry_run),
1280 ..OperationTraceSummary::default()
1281 },
1282 None,
1283 correlation_id,
1284 );
1285 Ok(Json(report))
1286}
1287
1288async fn compact_http<S>(
1289 store: Arc<S>,
1290 request: CompactionRequest,
1291 headers: HeaderMap,
1292 runtime: ServerRuntime,
1293) -> Result<Json<CompactionReport>, (StatusCode, Json<HttpErrorBody>)>
1294where
1295 S: MemoryStore + 'static,
1296{
1297 runtime
1298 .metrics()
1299 .http_compact
1300 .fetch_add(1, Ordering::Relaxed);
1301 let started_at_unix_ms = now_unix_ms();
1302 let correlation_id = runtime.traces().next_id("corr");
1303 let _permit = runtime
1304 .admission()
1305 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1306 .await
1307 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1308 let report = store
1309 .compact(request.clone())
1310 .await
1311 .map_err(map_http_store_error)?;
1312 record_trace(
1313 &runtime,
1314 TraceOperationKind::Compact,
1315 "http",
1316 Some(request.tenant_id),
1317 request.namespace,
1318 http_principal(&headers),
1319 started_at_unix_ms,
1320 TraceStatus::Ok,
1321 None,
1322 OperationTraceSummary {
1323 dry_run: Some(request.dry_run),
1324 ..OperationTraceSummary::default()
1325 },
1326 None,
1327 correlation_id,
1328 );
1329 Ok(Json(report))
1330}
1331
1332async fn delete_http<S>(
1333 store: Arc<S>,
1334 request: DeleteHttpRequest,
1335 headers: HeaderMap,
1336 runtime: ServerRuntime,
1337) -> Result<Json<DeleteReceipt>, (StatusCode, Json<HttpErrorBody>)>
1338where
1339 S: MemoryStore + 'static,
1340{
1341 runtime
1342 .metrics()
1343 .http_delete
1344 .fetch_add(1, Ordering::Relaxed);
1345 let started_at_unix_ms = now_unix_ms();
1346 let correlation_id = runtime.traces().next_id("corr");
1347 let _permit = runtime
1348 .admission()
1349 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1350 .await
1351 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1352 let receipt = store
1353 .delete(DeleteRequest {
1354 tenant_id: request.tenant_id.clone(),
1355 namespace: request.namespace.clone(),
1356 record_id: request.record_id,
1357 hard_delete: request.hard_delete,
1358 audit_reason: request.audit_reason,
1359 })
1360 .await
1361 .map_err(map_http_store_error)?;
1362 record_trace(
1363 &runtime,
1364 TraceOperationKind::Delete,
1365 "http",
1366 Some(request.tenant_id),
1367 Some(request.namespace),
1368 http_principal(&headers),
1369 started_at_unix_ms,
1370 TraceStatus::Ok,
1371 None,
1372 OperationTraceSummary {
1373 record_id: Some(receipt.record_id.clone()),
1374 ..OperationTraceSummary::default()
1375 },
1376 None,
1377 correlation_id,
1378 );
1379 Ok(Json(receipt))
1380}
1381
1382async fn archive_http<S>(
1383 store: Arc<S>,
1384 request: ArchiveHttpRequest,
1385 headers: HeaderMap,
1386 runtime: ServerRuntime,
1387) -> Result<Json<ArchiveReceipt>, (StatusCode, Json<HttpErrorBody>)>
1388where
1389 S: MemoryStore + 'static,
1390{
1391 runtime
1392 .metrics()
1393 .http_archive
1394 .fetch_add(1, Ordering::Relaxed);
1395 let started_at_unix_ms = now_unix_ms();
1396 let correlation_id = runtime.traces().next_id("corr");
1397 let _permit = runtime
1398 .admission()
1399 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1400 .await
1401 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1402 let receipt = store
1403 .archive(ArchiveRequest {
1404 tenant_id: request.tenant_id.clone(),
1405 namespace: request.namespace.clone(),
1406 record_id: request.record_id,
1407 dry_run: request.dry_run,
1408 audit_reason: request.audit_reason,
1409 })
1410 .await
1411 .map_err(map_http_store_error)?;
1412 record_trace(
1413 &runtime,
1414 TraceOperationKind::Archive,
1415 "http",
1416 Some(request.tenant_id),
1417 Some(request.namespace),
1418 http_principal(&headers),
1419 started_at_unix_ms,
1420 TraceStatus::Ok,
1421 None,
1422 OperationTraceSummary {
1423 record_id: Some(receipt.record_id.clone()),
1424 dry_run: Some(receipt.dry_run),
1425 ..OperationTraceSummary::default()
1426 },
1427 None,
1428 correlation_id,
1429 );
1430 Ok(Json(receipt))
1431}
1432
1433async fn suppress_http<S>(
1434 store: Arc<S>,
1435 request: SuppressHttpRequest,
1436 headers: HeaderMap,
1437 runtime: ServerRuntime,
1438) -> Result<Json<SuppressReceipt>, (StatusCode, Json<HttpErrorBody>)>
1439where
1440 S: MemoryStore + 'static,
1441{
1442 runtime
1443 .metrics()
1444 .http_suppress
1445 .fetch_add(1, Ordering::Relaxed);
1446 let started_at_unix_ms = now_unix_ms();
1447 let correlation_id = runtime.traces().next_id("corr");
1448 let _permit = runtime
1449 .admission()
1450 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1451 .await
1452 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1453 let receipt = store
1454 .suppress(SuppressRequest {
1455 tenant_id: request.tenant_id.clone(),
1456 namespace: request.namespace.clone(),
1457 record_id: request.record_id,
1458 dry_run: request.dry_run,
1459 audit_reason: request.audit_reason,
1460 })
1461 .await
1462 .map_err(map_http_store_error)?;
1463 record_trace(
1464 &runtime,
1465 TraceOperationKind::Suppress,
1466 "http",
1467 Some(request.tenant_id),
1468 Some(request.namespace),
1469 http_principal(&headers),
1470 started_at_unix_ms,
1471 TraceStatus::Ok,
1472 None,
1473 OperationTraceSummary {
1474 record_id: Some(receipt.record_id.clone()),
1475 dry_run: Some(receipt.dry_run),
1476 ..OperationTraceSummary::default()
1477 },
1478 None,
1479 correlation_id,
1480 );
1481 Ok(Json(receipt))
1482}
1483
1484async fn recover_http<S>(
1485 store: Arc<S>,
1486 request: RecoverHttpRequest,
1487 headers: HeaderMap,
1488 runtime: ServerRuntime,
1489) -> Result<Json<RecoverReceipt>, (StatusCode, Json<HttpErrorBody>)>
1490where
1491 S: MemoryStore + 'static,
1492{
1493 runtime
1494 .metrics()
1495 .http_recover
1496 .fetch_add(1, Ordering::Relaxed);
1497 let started_at_unix_ms = now_unix_ms();
1498 let correlation_id = runtime.traces().next_id("corr");
1499 let _permit = runtime
1500 .admission()
1501 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1502 .await
1503 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1504 let receipt = store
1505 .recover(RecoverRequest {
1506 tenant_id: request.tenant_id.clone(),
1507 namespace: request.namespace.clone(),
1508 record_id: request.record_id,
1509 dry_run: request.dry_run,
1510 audit_reason: request.audit_reason,
1511 quality_state: request.quality_state,
1512 historical_state: request.historical_state,
1513 })
1514 .await
1515 .map_err(map_http_store_error)?;
1516 record_trace(
1517 &runtime,
1518 TraceOperationKind::Recover,
1519 "http",
1520 Some(request.tenant_id),
1521 Some(request.namespace),
1522 http_principal(&headers),
1523 started_at_unix_ms,
1524 TraceStatus::Ok,
1525 None,
1526 OperationTraceSummary {
1527 record_id: Some(receipt.record_id.clone()),
1528 dry_run: Some(receipt.dry_run),
1529 ..OperationTraceSummary::default()
1530 },
1531 None,
1532 correlation_id,
1533 );
1534 Ok(Json(receipt))
1535}
1536
1537async fn traces_http(
1538 request: TraceListRequest,
1539 runtime: ServerRuntime,
1540) -> Result<Json<Vec<OperationTrace>>, (StatusCode, Json<HttpErrorBody>)> {
1541 runtime
1542 .metrics()
1543 .http_traces
1544 .fetch_add(1, Ordering::Relaxed);
1545 runtime
1546 .metrics()
1547 .trace_reads
1548 .fetch_add(1, Ordering::Relaxed);
1549 Ok(Json(runtime.traces().list(&request)))
1550}
1551
1552async fn trace_http(
1553 trace_id: String,
1554 runtime: ServerRuntime,
1555) -> Result<Json<OperationTrace>, (StatusCode, Json<HttpErrorBody>)> {
1556 runtime
1557 .metrics()
1558 .http_traces
1559 .fetch_add(1, Ordering::Relaxed);
1560 runtime
1561 .metrics()
1562 .trace_reads
1563 .fetch_add(1, Ordering::Relaxed);
1564 runtime.traces().get(&trace_id).map(Json).ok_or_else(|| {
1565 (
1566 StatusCode::NOT_FOUND,
1567 Json(HttpErrorBody {
1568 error: format!("trace {trace_id} not found"),
1569 }),
1570 )
1571 })
1572}
1573
1574async fn runtime_status_http(runtime: ServerRuntime) -> Json<ServerRuntimeStatus> {
1575 runtime
1576 .metrics()
1577 .http_runtime
1578 .fetch_add(1, Ordering::Relaxed);
1579 Json(ServerRuntimeStatus {
1580 backend: runtime.backend().as_ref().clone(),
1581 admission: runtime.admission().snapshot(),
1582 traces: runtime.traces().snapshot(),
1583 })
1584}
1585
1586async fn export_http<S>(
1587 store: Arc<S>,
1588 request: ExportRequest,
1589 headers: HeaderMap,
1590 runtime: ServerRuntime,
1591) -> Result<Json<PortableStorePackage>, (StatusCode, Json<HttpErrorBody>)>
1592where
1593 S: MemoryStore + 'static,
1594{
1595 runtime
1596 .metrics()
1597 .http_export
1598 .fetch_add(1, Ordering::Relaxed);
1599 let started_at_unix_ms = now_unix_ms();
1600 let correlation_id = runtime.traces().next_id("corr");
1601 let _permit = runtime
1602 .admission()
1603 .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1604 .await
1605 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1606 let package = store
1607 .export(request.clone())
1608 .await
1609 .map_err(map_http_store_error)?;
1610 record_trace(
1611 &runtime,
1612 TraceOperationKind::Export,
1613 "http",
1614 request.tenant_id,
1615 request.namespace,
1616 http_principal(&headers),
1617 started_at_unix_ms,
1618 TraceStatus::Ok,
1619 None,
1620 OperationTraceSummary::default(),
1621 None,
1622 correlation_id,
1623 );
1624 Ok(Json(package))
1625}
1626
1627async fn import_http<S>(
1628 store: Arc<S>,
1629 request: ImportRequest,
1630 headers: HeaderMap,
1631 runtime: ServerRuntime,
1632) -> Result<Json<ImportReport>, (StatusCode, Json<HttpErrorBody>)>
1633where
1634 S: MemoryStore + 'static,
1635{
1636 runtime
1637 .metrics()
1638 .http_import
1639 .fetch_add(1, Ordering::Relaxed);
1640 let started_at_unix_ms = now_unix_ms();
1641 let correlation_id = runtime.traces().next_id("corr");
1642 let tenant_id = request
1643 .package
1644 .records
1645 .first()
1646 .map(|entry| entry.record.scope.tenant_id.clone());
1647 let namespace = request
1648 .package
1649 .records
1650 .first()
1651 .map(|entry| entry.record.scope.namespace.clone());
1652 let _permit = runtime
1653 .admission()
1654 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
1655 .await
1656 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1657 let report = store
1658 .import(request.clone())
1659 .await
1660 .map_err(map_http_store_error)?;
1661 record_trace(
1662 &runtime,
1663 TraceOperationKind::Import,
1664 "http",
1665 tenant_id,
1666 namespace,
1667 http_principal(&headers),
1668 started_at_unix_ms,
1669 if report.failed_records.is_empty() {
1670 TraceStatus::Ok
1671 } else {
1672 TraceStatus::Error
1673 },
1674 (!report.failed_records.is_empty())
1675 .then(|| format!("{} import validation failures", report.failed_records.len())),
1676 OperationTraceSummary {
1677 request_count: Some(request.package.records.len() as u32),
1678 dry_run: Some(request.dry_run),
1679 ..OperationTraceSummary::default()
1680 },
1681 None,
1682 correlation_id,
1683 );
1684 Ok(Json(report))
1685}
1686
1687async fn metrics_http(metrics: Arc<ServerMetrics>) -> impl IntoResponse {
1688 metrics.http_metrics.fetch_add(1, Ordering::Relaxed);
1689 (
1690 [(
1691 axum::http::header::CONTENT_TYPE,
1692 "text/plain; version=0.0.4",
1693 )],
1694 metrics.render(),
1695 )
1696}
1697
1698async fn enforce_http_guardrails(
1699 request: AxumRequest,
1700 next: Next,
1701 limits: Arc<ServerLimits>,
1702 metrics: Arc<ServerMetrics>,
1703 auth: Arc<AuthConfig>,
1704) -> Result<axum::response::Response, StatusCode> {
1705 let path = request.uri().path().to_string();
1706 if let Some(content_length) = request.headers().get(axum::http::header::CONTENT_LENGTH)
1707 && let Ok(content_length) = content_length.to_str()
1708 && let Ok(content_length) = content_length.parse::<usize>()
1709 && content_length > limits.max_http_body_bytes
1710 {
1711 metrics
1712 .http_rejected_body_too_large
1713 .fetch_add(1, Ordering::Relaxed);
1714 return Err(StatusCode::PAYLOAD_TOO_LARGE);
1715 }
1716 validate_http_auth(&request, &path, auth.as_ref())?;
1717 Ok(next.run(request).await)
1718}
1719
1720fn http_principal(headers: &HeaderMap) -> Option<String> {
1721 headers
1722 .get(axum::http::header::AUTHORIZATION)
1723 .and_then(|value| value.to_str().ok())
1724 .map(|_| "bearer".to_string())
1725}
1726
1727fn grpc_principal<T>(request: &Request<T>) -> Option<String> {
1728 request
1729 .metadata()
1730 .get("authorization")
1731 .and_then(|value| value.to_str().ok())
1732 .map(|_| "bearer".to_string())
1733}
1734
1735fn map_http_admission_error(
1736 metrics: &ServerMetrics,
1737 error: AdmissionError,
1738) -> (StatusCode, Json<HttpErrorBody>) {
1739 match error {
1740 AdmissionError::QueueFull | AdmissionError::TenantBusy => {
1741 metrics.admission_rejected.fetch_add(1, Ordering::Relaxed);
1742 (
1743 StatusCode::TOO_MANY_REQUESTS,
1744 Json(HttpErrorBody {
1745 error: "request admission rejected".to_string(),
1746 }),
1747 )
1748 }
1749 AdmissionError::TimedOut => {
1750 metrics.admission_timed_out.fetch_add(1, Ordering::Relaxed);
1751 (
1752 StatusCode::TOO_MANY_REQUESTS,
1753 Json(HttpErrorBody {
1754 error: "request admission timed out".to_string(),
1755 }),
1756 )
1757 }
1758 }
1759}
1760
1761fn map_grpc_admission_error(metrics: &ServerMetrics, error: AdmissionError) -> Status {
1762 match error {
1763 AdmissionError::QueueFull | AdmissionError::TenantBusy => {
1764 metrics.admission_rejected.fetch_add(1, Ordering::Relaxed);
1765 Status::resource_exhausted("request admission rejected")
1766 }
1767 AdmissionError::TimedOut => {
1768 metrics.admission_timed_out.fetch_add(1, Ordering::Relaxed);
1769 Status::resource_exhausted("request admission timed out")
1770 }
1771 }
1772}
1773
1774fn attach_correlation_id(result: &mut RecallResult, correlation_id: &str) {
1775 if let Some(explanation) = result.explanation.as_mut() {
1776 if let Some(existing) = explanation.trace_id.replace(correlation_id.to_string()) {
1777 explanation
1778 .policy_notes
1779 .push(format!("planning_trace_id={existing}"));
1780 }
1781 explanation
1782 .policy_notes
1783 .push(format!("correlation_id={correlation_id}"));
1784 }
1785 for hit in &mut result.hits {
1786 if let Some(explanation) = hit.explanation.as_mut() {
1787 if let Some(existing) = explanation.trace_id.replace(correlation_id.to_string()) {
1788 explanation
1789 .policy_notes
1790 .push(format!("planning_trace_id={existing}"));
1791 }
1792 explanation
1793 .policy_notes
1794 .push(format!("correlation_id={correlation_id}"));
1795 }
1796 }
1797}
1798
1799#[allow(clippy::too_many_arguments)]
1800fn record_trace(
1801 runtime: &ServerRuntime,
1802 operation: TraceOperationKind,
1803 transport: &str,
1804 tenant_id: Option<String>,
1805 namespace: Option<String>,
1806 principal: Option<String>,
1807 started_at_unix_ms: u64,
1808 status: TraceStatus,
1809 status_message: Option<String>,
1810 summary: OperationTraceSummary,
1811 recall_explanation: Option<RecallExplanation>,
1812 correlation_id: String,
1813) {
1814 let completed_at_unix_ms = now_unix_ms();
1815 let admission_class = admission_class_for_operation(operation.clone()).to_string();
1816 let planning_trace_id = recall_explanation.as_ref().and_then(|value| {
1817 value
1818 .planning_trace
1819 .as_ref()
1820 .map(|trace| trace.trace_id.clone())
1821 });
1822 let store_span_id = runtime.traces().next_id("store-span");
1823 let evicted = runtime.traces().record(OperationTrace {
1824 trace_id: runtime.traces().next_id("trace"),
1825 correlation_id,
1826 operation,
1827 transport: transport.to_string(),
1828 backend: Some(runtime.backend().as_ref().clone()),
1829 admission_class: Some(admission_class),
1830 tenant_id,
1831 namespace,
1832 principal,
1833 store_span_id: Some(store_span_id),
1834 planning_trace_id,
1835 started_at_unix_ms,
1836 completed_at_unix_ms,
1837 latency_ms: completed_at_unix_ms.saturating_sub(started_at_unix_ms),
1838 status,
1839 status_message,
1840 summary,
1841 recall_explanation,
1842 });
1843 runtime
1844 .metrics()
1845 .trace_records
1846 .fetch_add(1, Ordering::Relaxed);
1847 if evicted {
1848 runtime
1849 .metrics()
1850 .trace_evictions
1851 .fetch_add(1, Ordering::Relaxed);
1852 }
1853}
1854
1855fn admission_class_for_operation(operation: TraceOperationKind) -> &'static str {
1856 match operation {
1857 TraceOperationKind::Recall | TraceOperationKind::Snapshot | TraceOperationKind::Stats => {
1858 "read"
1859 }
1860 TraceOperationKind::Upsert
1861 | TraceOperationKind::BatchUpsert
1862 | TraceOperationKind::Compact
1863 | TraceOperationKind::Delete
1864 | TraceOperationKind::Archive
1865 | TraceOperationKind::Suppress
1866 | TraceOperationKind::Recover => "write",
1867 TraceOperationKind::IntegrityCheck
1868 | TraceOperationKind::Repair
1869 | TraceOperationKind::Export
1870 | TraceOperationKind::Import => "admin",
1871 }
1872}
1873
1874fn validate_http_auth(
1875 request: &AxumRequest,
1876 path: &str,
1877 auth: &AuthConfig,
1878) -> Result<(), StatusCode> {
1879 if path == "/healthz" || path == "/readyz" || (path == "/metrics" && !auth.protect_metrics) {
1880 return Ok(());
1881 }
1882 let provided = request
1883 .headers()
1884 .get(axum::http::header::AUTHORIZATION)
1885 .and_then(|value| value.to_str().ok());
1886 let permission = match path {
1887 "/metrics" => AuthPermission::Metrics,
1888 "/memory/recall" => AuthPermission::Read,
1889 "/memory/upsert" | "/memory/batch-upsert" => AuthPermission::Write,
1890 "/admin/stats" | "/admin/integrity" | "/admin/repair" | "/admin/snapshot"
1891 | "/admin/compact" | "/admin/delete" | "/admin/archive" | "/admin/suppress"
1892 | "/admin/recover" | "/admin/runtime" | "/admin/export" | "/admin/import" => {
1893 AuthPermission::Admin
1894 }
1895 _ => AuthPermission::Admin,
1896 };
1897 if path.starts_with("/admin/traces") {
1898 return auth
1899 .authorize(provided, AuthPermission::Admin)
1900 .then_some(())
1901 .ok_or(StatusCode::UNAUTHORIZED);
1902 }
1903 if auth.authorize(provided, permission) {
1904 Ok(())
1905 } else {
1906 Err(StatusCode::UNAUTHORIZED)
1907 }
1908}
1909
1910fn validate_grpc_auth<T>(
1911 request: &Request<T>,
1912 auth: &AuthConfig,
1913 permission: AuthPermission,
1914) -> Result<(), Status> {
1915 let provided = request
1916 .metadata()
1917 .get("authorization")
1918 .and_then(|value| value.to_str().ok());
1919 if auth.authorize(provided, permission) {
1920 Ok(())
1921 } else {
1922 Err(Status::unauthenticated("missing or invalid bearer token"))
1923 }
1924}
1925
1926fn bearer_token_from_header(header: Option<&str>) -> Option<&str> {
1927 header.and_then(|value| value.strip_prefix("Bearer "))
1928}
1929
1930fn invalid_argument(message: impl Into<String>) -> Status {
1931 Status::invalid_argument(message.into())
1932}
1933
1934fn internal_status(message: impl Into<String>) -> Status {
1935 Status::internal(message.into())
1936}
1937
1938fn trust_level_from_proto(value: &str) -> Result<MemoryTrustLevel, Status> {
1939 match value {
1940 "" | "derived" => Ok(MemoryTrustLevel::Derived),
1941 "untrusted" => Ok(MemoryTrustLevel::Untrusted),
1942 "observed" => Ok(MemoryTrustLevel::Observed),
1943 "verified" => Ok(MemoryTrustLevel::Verified),
1944 "pinned" => Ok(MemoryTrustLevel::Pinned),
1945 other => Err(invalid_argument(format!("unknown trust level: {other}"))),
1946 }
1947}
1948
1949fn trust_level_to_proto(value: MemoryTrustLevel) -> String {
1950 match value {
1951 MemoryTrustLevel::Untrusted => "untrusted",
1952 MemoryTrustLevel::Observed => "observed",
1953 MemoryTrustLevel::Derived => "derived",
1954 MemoryTrustLevel::Verified => "verified",
1955 MemoryTrustLevel::Pinned => "pinned",
1956 }
1957 .to_string()
1958}
1959
1960fn record_kind_from_proto(value: &str) -> Result<MemoryRecordKind, Status> {
1961 match value {
1962 "episodic" => Ok(MemoryRecordKind::Episodic),
1963 "summary" => Ok(MemoryRecordKind::Summary),
1964 "fact" => Ok(MemoryRecordKind::Fact),
1965 "preference" => Ok(MemoryRecordKind::Preference),
1966 "task" => Ok(MemoryRecordKind::Task),
1967 "artifact" => Ok(MemoryRecordKind::Artifact),
1968 "hypothesis" => Ok(MemoryRecordKind::Hypothesis),
1969 other => Err(invalid_argument(format!("unknown record kind: {other}"))),
1970 }
1971}
1972
1973fn record_kind_to_proto(value: MemoryRecordKind) -> String {
1974 match value {
1975 MemoryRecordKind::Episodic => "episodic",
1976 MemoryRecordKind::Summary => "summary",
1977 MemoryRecordKind::Fact => "fact",
1978 MemoryRecordKind::Preference => "preference",
1979 MemoryRecordKind::Task => "task",
1980 MemoryRecordKind::Artifact => "artifact",
1981 MemoryRecordKind::Hypothesis => "hypothesis",
1982 }
1983 .to_string()
1984}
1985
1986fn quality_state_from_proto(value: &str) -> Result<MemoryQualityState, Status> {
1987 match value {
1988 "draft" => Ok(MemoryQualityState::Draft),
1989 "active" => Ok(MemoryQualityState::Active),
1990 "verified" => Ok(MemoryQualityState::Verified),
1991 "archived" => Ok(MemoryQualityState::Archived),
1992 "suppressed" => Ok(MemoryQualityState::Suppressed),
1993 "deleted" => Ok(MemoryQualityState::Deleted),
1994 other => Err(invalid_argument(format!("unknown quality state: {other}"))),
1995 }
1996}
1997
1998fn quality_state_to_proto(value: MemoryQualityState) -> String {
1999 match value {
2000 MemoryQualityState::Draft => "draft",
2001 MemoryQualityState::Active => "active",
2002 MemoryQualityState::Verified => "verified",
2003 MemoryQualityState::Archived => "archived",
2004 MemoryQualityState::Suppressed => "suppressed",
2005 MemoryQualityState::Deleted => "deleted",
2006 }
2007 .to_string()
2008}
2009
2010fn scope_from_proto(scope: ProtoMemoryScope) -> Result<MemoryScope, Status> {
2011 Ok(MemoryScope {
2012 tenant_id: scope.tenant_id,
2013 namespace: scope.namespace,
2014 actor_id: scope.actor_id,
2015 conversation_id: scope.conversation_id,
2016 session_id: scope.session_id,
2017 source: scope.source,
2018 labels: scope.labels,
2019 trust_level: trust_level_from_proto(&scope.trust_level)?,
2020 })
2021}
2022
2023fn scope_to_proto(scope: MemoryScope) -> ProtoMemoryScope {
2024 ProtoMemoryScope {
2025 tenant_id: scope.tenant_id,
2026 namespace: scope.namespace,
2027 actor_id: scope.actor_id,
2028 conversation_id: scope.conversation_id,
2029 session_id: scope.session_id,
2030 source: scope.source,
2031 labels: scope.labels,
2032 trust_level: trust_level_to_proto(scope.trust_level),
2033 }
2034}
2035
2036fn artifact_from_proto(value: ProtoArtifactPointer) -> ArtifactPointer {
2037 ArtifactPointer {
2038 uri: value.uri,
2039 media_type: value.media_type,
2040 checksum: value.checksum,
2041 }
2042}
2043
2044fn artifact_to_proto(value: ArtifactPointer) -> ProtoArtifactPointer {
2045 ProtoArtifactPointer {
2046 uri: value.uri,
2047 media_type: value.media_type,
2048 checksum: value.checksum,
2049 }
2050}
2051
2052fn continuity_state_from_proto(value: &str) -> Result<EpisodeContinuityState, Status> {
2053 match value {
2054 "" | "open" => Ok(EpisodeContinuityState::Open),
2055 "resolved" => Ok(EpisodeContinuityState::Resolved),
2056 "superseded" => Ok(EpisodeContinuityState::Superseded),
2057 "abandoned" => Ok(EpisodeContinuityState::Abandoned),
2058 other => Err(invalid_argument(format!(
2059 "unknown continuity state: {other}"
2060 ))),
2061 }
2062}
2063
2064fn continuity_state_to_proto(value: EpisodeContinuityState) -> String {
2065 match value {
2066 EpisodeContinuityState::Open => "open",
2067 EpisodeContinuityState::Resolved => "resolved",
2068 EpisodeContinuityState::Superseded => "superseded",
2069 EpisodeContinuityState::Abandoned => "abandoned",
2070 }
2071 .to_string()
2072}
2073
2074fn affective_provenance_from_proto(value: &str) -> Result<AffectiveAnnotationProvenance, Status> {
2075 match value {
2076 "" | "authored" => Ok(AffectiveAnnotationProvenance::Authored),
2077 "imported" => Ok(AffectiveAnnotationProvenance::Imported),
2078 "derived" => Ok(AffectiveAnnotationProvenance::Derived),
2079 other => Err(invalid_argument(format!(
2080 "unknown affective provenance: {other}"
2081 ))),
2082 }
2083}
2084
2085fn affective_provenance_to_proto(value: AffectiveAnnotationProvenance) -> String {
2086 match value {
2087 AffectiveAnnotationProvenance::Authored => "authored",
2088 AffectiveAnnotationProvenance::Imported => "imported",
2089 AffectiveAnnotationProvenance::Derived => "derived",
2090 }
2091 .to_string()
2092}
2093
2094fn temporal_order_from_proto(value: &str) -> Result<RecallTemporalOrder, Status> {
2095 match value {
2096 "" | "relevance" => Ok(RecallTemporalOrder::Relevance),
2097 "chronological_asc" => Ok(RecallTemporalOrder::ChronologicalAsc),
2098 "chronological_desc" => Ok(RecallTemporalOrder::ChronologicalDesc),
2099 other => Err(invalid_argument(format!("unknown temporal order: {other}"))),
2100 }
2101}
2102
2103fn planning_profile_from_proto(value: &str) -> Result<RecallPlanningProfile, Status> {
2104 match value {
2105 "" | "fast_path" => Ok(RecallPlanningProfile::FastPath),
2106 "continuity_aware" => Ok(RecallPlanningProfile::ContinuityAware),
2107 other => Err(invalid_argument(format!(
2108 "unknown planning profile: {other}"
2109 ))),
2110 }
2111}
2112
2113fn planning_profile_to_proto(value: RecallPlanningProfile) -> String {
2114 match value {
2115 RecallPlanningProfile::FastPath => "fast_path",
2116 RecallPlanningProfile::ContinuityAware => "continuity_aware",
2117 }
2118 .to_string()
2119}
2120
2121fn planner_stage_to_proto(value: RecallPlannerStage) -> String {
2122 match value {
2123 RecallPlannerStage::CandidateGeneration => "candidate_generation",
2124 RecallPlannerStage::GraphExpansion => "graph_expansion",
2125 RecallPlannerStage::Selection => "selection",
2126 }
2127 .to_string()
2128}
2129
2130fn candidate_source_to_proto(value: RecallCandidateSource) -> String {
2131 match value {
2132 RecallCandidateSource::Lexical => "lexical",
2133 RecallCandidateSource::Semantic => "semantic",
2134 RecallCandidateSource::Metadata => "metadata",
2135 RecallCandidateSource::Episode => "episode",
2136 RecallCandidateSource::Graph => "graph",
2137 RecallCandidateSource::Temporal => "temporal",
2138 RecallCandidateSource::Provenance => "provenance",
2139 }
2140 .to_string()
2141}
2142
2143fn affective_annotation_from_proto(
2144 value: ProtoAffectiveAnnotation,
2145) -> Result<AffectiveAnnotation, Status> {
2146 Ok(AffectiveAnnotation {
2147 tone: value.tone,
2148 sentiment: value.sentiment,
2149 urgency: value.urgency,
2150 confidence: value.confidence,
2151 tension: value.tension,
2152 provenance: affective_provenance_from_proto(&value.provenance)?,
2153 })
2154}
2155
2156fn affective_annotation_to_proto(value: AffectiveAnnotation) -> ProtoAffectiveAnnotation {
2157 ProtoAffectiveAnnotation {
2158 tone: value.tone,
2159 sentiment: value.sentiment,
2160 urgency: value.urgency,
2161 confidence: value.confidence,
2162 tension: value.tension,
2163 provenance: affective_provenance_to_proto(value.provenance),
2164 }
2165}
2166
2167fn historical_state_from_proto(value: &str) -> Result<MemoryHistoricalState, Status> {
2168 match value {
2169 "" | "current" => Ok(MemoryHistoricalState::Current),
2170 "historical" => Ok(MemoryHistoricalState::Historical),
2171 "superseded" => Ok(MemoryHistoricalState::Superseded),
2172 other => Err(invalid_argument(format!(
2173 "unknown historical state: {other}"
2174 ))),
2175 }
2176}
2177
2178fn historical_state_to_proto(value: MemoryHistoricalState) -> String {
2179 match value {
2180 MemoryHistoricalState::Current => "current",
2181 MemoryHistoricalState::Historical => "historical",
2182 MemoryHistoricalState::Superseded => "superseded",
2183 }
2184 .to_string()
2185}
2186
2187fn lineage_relation_from_proto(value: &str) -> Result<LineageRelationKind, Status> {
2188 match value {
2189 "" | "derived_from" => Ok(LineageRelationKind::DerivedFrom),
2190 "consolidated_from" => Ok(LineageRelationKind::ConsolidatedFrom),
2191 "supersedes" => Ok(LineageRelationKind::Supersedes),
2192 "superseded_by" => Ok(LineageRelationKind::SupersededBy),
2193 "conflicts_with" => Ok(LineageRelationKind::ConflictsWith),
2194 other => Err(invalid_argument(format!(
2195 "unknown lineage relation: {other}"
2196 ))),
2197 }
2198}
2199
2200fn lineage_relation_to_proto(value: LineageRelationKind) -> String {
2201 match value {
2202 LineageRelationKind::DerivedFrom => "derived_from",
2203 LineageRelationKind::ConsolidatedFrom => "consolidated_from",
2204 LineageRelationKind::Supersedes => "supersedes",
2205 LineageRelationKind::SupersededBy => "superseded_by",
2206 LineageRelationKind::ConflictsWith => "conflicts_with",
2207 }
2208 .to_string()
2209}
2210
2211fn lineage_link_from_proto(value: ProtoLineageLink) -> Result<LineageLink, Status> {
2212 Ok(LineageLink {
2213 record_id: value.record_id,
2214 relation: lineage_relation_from_proto(&value.relation)?,
2215 confidence: value.confidence,
2216 })
2217}
2218
2219fn lineage_link_to_proto(value: LineageLink) -> ProtoLineageLink {
2220 ProtoLineageLink {
2221 record_id: value.record_id,
2222 relation: lineage_relation_to_proto(value.relation),
2223 confidence: value.confidence,
2224 }
2225}
2226
2227fn historical_mode_from_proto(value: &str) -> Result<RecallHistoricalMode, Status> {
2228 match value {
2229 "" | "current_only" => Ok(RecallHistoricalMode::CurrentOnly),
2230 "include_historical" => Ok(RecallHistoricalMode::IncludeHistorical),
2231 "historical_only" => Ok(RecallHistoricalMode::HistoricalOnly),
2232 other => Err(invalid_argument(format!(
2233 "unknown historical mode: {other}"
2234 ))),
2235 }
2236}
2237
2238fn episode_context_from_proto(value: ProtoEpisodeContext) -> Result<EpisodeContext, Status> {
2239 Ok(EpisodeContext {
2240 schema_version: if value.schema_version == 0 {
2241 EPISODE_SCHEMA_VERSION
2242 } else {
2243 value.schema_version
2244 },
2245 episode_id: value.episode_id,
2246 summary: value.summary,
2247 continuity_state: continuity_state_from_proto(&value.continuity_state)?,
2248 actor_ids: value.actor_ids,
2249 goal: value.goal,
2250 outcome: value.outcome,
2251 started_at_unix_ms: value.started_at_unix_ms,
2252 ended_at_unix_ms: value.ended_at_unix_ms,
2253 last_active_unix_ms: value.last_active_unix_ms,
2254 recurrence_key: value.recurrence_key,
2255 recurrence_interval_ms: value.recurrence_interval_ms,
2256 boundary_label: value.boundary_label,
2257 previous_record_id: value.previous_record_id,
2258 next_record_id: value.next_record_id,
2259 causal_record_ids: value.causal_record_ids,
2260 related_record_ids: value.related_record_ids,
2261 linked_artifact_uris: value.linked_artifact_uris,
2262 salience: value
2263 .salience
2264 .map_or_else(EpisodeSalience::default, |salience| EpisodeSalience {
2265 reuse_count: salience.reuse_count,
2266 novelty_score: salience.novelty_score,
2267 goal_relevance: salience.goal_relevance,
2268 unresolved_weight: salience.unresolved_weight,
2269 }),
2270 affective: value
2271 .affective
2272 .map(affective_annotation_from_proto)
2273 .transpose()?,
2274 })
2275}
2276
2277fn episode_context_to_proto(value: EpisodeContext) -> ProtoEpisodeContext {
2278 ProtoEpisodeContext {
2279 schema_version: value.schema_version,
2280 episode_id: value.episode_id,
2281 summary: value.summary,
2282 continuity_state: continuity_state_to_proto(value.continuity_state),
2283 actor_ids: value.actor_ids,
2284 goal: value.goal,
2285 outcome: value.outcome,
2286 started_at_unix_ms: value.started_at_unix_ms,
2287 ended_at_unix_ms: value.ended_at_unix_ms,
2288 last_active_unix_ms: value.last_active_unix_ms,
2289 recurrence_key: value.recurrence_key,
2290 recurrence_interval_ms: value.recurrence_interval_ms,
2291 boundary_label: value.boundary_label,
2292 previous_record_id: value.previous_record_id,
2293 next_record_id: value.next_record_id,
2294 causal_record_ids: value.causal_record_ids,
2295 related_record_ids: value.related_record_ids,
2296 linked_artifact_uris: value.linked_artifact_uris,
2297 salience: Some(ProtoEpisodeSalience {
2298 reuse_count: value.salience.reuse_count,
2299 novelty_score: value.salience.novelty_score,
2300 goal_relevance: value.salience.goal_relevance,
2301 unresolved_weight: value.salience.unresolved_weight,
2302 }),
2303 affective: value.affective.map(affective_annotation_to_proto),
2304 }
2305}
2306
2307fn record_from_proto(record: ProtoMemoryRecord) -> Result<MemoryRecord, Status> {
2308 Ok(MemoryRecord {
2309 id: record.id,
2310 scope: scope_from_proto(
2311 record
2312 .scope
2313 .ok_or_else(|| invalid_argument("memory record scope is required"))?,
2314 )?,
2315 kind: record_kind_from_proto(&record.kind)?,
2316 content: record.content,
2317 summary: record.summary,
2318 source_id: record.source_id,
2319 metadata: record.metadata.into_iter().collect(),
2320 quality_state: quality_state_from_proto(&record.quality_state)?,
2321 created_at_unix_ms: record.created_at_unix_ms,
2322 updated_at_unix_ms: record.updated_at_unix_ms,
2323 expires_at_unix_ms: record.expires_at_unix_ms,
2324 importance_score: record.importance_score,
2325 artifact: record.artifact.map(artifact_from_proto),
2326 episode: record.episode.map(episode_context_from_proto).transpose()?,
2327 historical_state: historical_state_from_proto(
2328 record.historical_state.as_deref().unwrap_or(""),
2329 )?,
2330 lineage: record
2331 .lineage
2332 .into_iter()
2333 .map(lineage_link_from_proto)
2334 .collect::<Result<Vec<_>, _>>()?,
2335 })
2336}
2337
2338fn record_to_proto(record: MemoryRecord) -> ProtoMemoryRecord {
2339 ProtoMemoryRecord {
2340 id: record.id,
2341 scope: Some(scope_to_proto(record.scope)),
2342 kind: record_kind_to_proto(record.kind),
2343 content: record.content,
2344 summary: record.summary,
2345 metadata: record.metadata.into_iter().collect(),
2346 quality_state: quality_state_to_proto(record.quality_state),
2347 created_at_unix_ms: record.created_at_unix_ms,
2348 updated_at_unix_ms: record.updated_at_unix_ms,
2349 expires_at_unix_ms: record.expires_at_unix_ms,
2350 importance_score: record.importance_score,
2351 source_id: record.source_id,
2352 artifact: record.artifact.map(artifact_to_proto),
2353 episode: record.episode.map(episode_context_to_proto),
2354 historical_state: Some(historical_state_to_proto(record.historical_state)),
2355 lineage: record
2356 .lineage
2357 .into_iter()
2358 .map(lineage_link_to_proto)
2359 .collect(),
2360 }
2361}
2362
2363fn recall_explanation_to_proto(value: RecallExplanation) -> ProtoRecallExplanation {
2364 ProtoRecallExplanation {
2365 selected_channels: value.selected_channels,
2366 policy_notes: value.policy_notes,
2367 trace_id: value.trace_id,
2368 planning_trace: value.planning_trace.map(recall_planning_trace_to_proto),
2369 scorer_kind: value
2370 .scorer_kind
2371 .map(recall_scorer_kind_to_proto)
2372 .unwrap_or(ProtoRecallScorerKind::Unspecified as i32),
2373 scoring_profile: value
2374 .scoring_profile
2375 .map(recall_scoring_profile_to_proto)
2376 .unwrap_or(ProtoRecallScoringProfile::Unspecified as i32),
2377 planning_profile: value.planning_profile.map(planning_profile_to_proto),
2378 policy_profile: value
2379 .policy_profile
2380 .map(recall_policy_profile_to_proto)
2381 .unwrap_or(ProtoRecallPolicyProfile::Unspecified as i32),
2382 }
2383}
2384
2385fn recall_scorer_kind_to_proto(value: RecallScorerKind) -> i32 {
2386 match value {
2387 RecallScorerKind::Profile => ProtoRecallScorerKind::Profile as i32,
2388 RecallScorerKind::Curated => ProtoRecallScorerKind::Curated as i32,
2389 }
2390}
2391
2392fn recall_scoring_profile_to_proto(value: RecallScoringProfile) -> i32 {
2393 match value {
2394 RecallScoringProfile::Balanced => ProtoRecallScoringProfile::Balanced as i32,
2395 RecallScoringProfile::LexicalFirst => ProtoRecallScoringProfile::LexicalFirst as i32,
2396 RecallScoringProfile::ImportanceFirst => ProtoRecallScoringProfile::ImportanceFirst as i32,
2397 }
2398}
2399
2400fn recall_policy_profile_to_proto(value: RecallPolicyProfile) -> i32 {
2401 match value {
2402 RecallPolicyProfile::General => ProtoRecallPolicyProfile::General as i32,
2403 RecallPolicyProfile::Support => ProtoRecallPolicyProfile::Support as i32,
2404 RecallPolicyProfile::Research => ProtoRecallPolicyProfile::Research as i32,
2405 RecallPolicyProfile::Assistant => ProtoRecallPolicyProfile::Assistant as i32,
2406 RecallPolicyProfile::AutonomousAgent => ProtoRecallPolicyProfile::AutonomousAgent as i32,
2407 }
2408}
2409
2410fn embedding_provider_kind_to_proto(value: EmbeddingProviderKind) -> i32 {
2411 match value {
2412 EmbeddingProviderKind::Disabled => ProtoEmbeddingProviderKind::Disabled as i32,
2413 EmbeddingProviderKind::DeterministicLocal => {
2414 ProtoEmbeddingProviderKind::DeterministicLocal as i32
2415 }
2416 }
2417}
2418
2419fn engine_tuning_info_to_proto(value: EngineTuningInfo) -> ProtoEngineTuningInfo {
2420 ProtoEngineTuningInfo {
2421 recall_scorer_kind: recall_scorer_kind_to_proto(value.recall_scorer_kind),
2422 recall_scoring_profile: recall_scoring_profile_to_proto(value.recall_scoring_profile),
2423 embedding_provider_kind: embedding_provider_kind_to_proto(value.embedding_provider_kind),
2424 embedding_dimensions: value.embedding_dimensions as u64,
2425 graph_expansion_max_hops: u32::from(value.graph_expansion_max_hops),
2426 compaction_summarize_after_record_count: value.compaction_summarize_after_record_count
2427 as u64,
2428 compaction_cold_archive_after_days: value.compaction_cold_archive_after_days,
2429 compaction_cold_archive_importance_threshold_per_mille: u32::from(
2430 value.compaction_cold_archive_importance_threshold_per_mille,
2431 ),
2432 recall_planning_profile: Some(planning_profile_to_proto(value.recall_planning_profile)),
2433 recall_policy_profile: recall_policy_profile_to_proto(value.recall_policy_profile),
2434 }
2435}
2436
2437fn engine_tuning_info_from_proto(value: ProtoEngineTuningInfo) -> Result<EngineTuningInfo, Status> {
2438 Ok(EngineTuningInfo {
2439 recall_scorer_kind: match ProtoRecallScorerKind::try_from(value.recall_scorer_kind)
2440 .unwrap_or(ProtoRecallScorerKind::Unspecified)
2441 {
2442 ProtoRecallScorerKind::Profile => RecallScorerKind::Profile,
2443 ProtoRecallScorerKind::Curated => RecallScorerKind::Curated,
2444 ProtoRecallScorerKind::Unspecified => {
2445 return Err(invalid_argument("engine recall scorer kind is required"));
2446 }
2447 },
2448 recall_scoring_profile: match ProtoRecallScoringProfile::try_from(
2449 value.recall_scoring_profile,
2450 )
2451 .unwrap_or(ProtoRecallScoringProfile::Unspecified)
2452 {
2453 ProtoRecallScoringProfile::Balanced => RecallScoringProfile::Balanced,
2454 ProtoRecallScoringProfile::LexicalFirst => RecallScoringProfile::LexicalFirst,
2455 ProtoRecallScoringProfile::ImportanceFirst => RecallScoringProfile::ImportanceFirst,
2456 ProtoRecallScoringProfile::Unspecified => {
2457 return Err(invalid_argument(
2458 "engine recall scoring profile is required",
2459 ));
2460 }
2461 },
2462 recall_planning_profile: planning_profile_from_proto(
2463 value.recall_planning_profile.as_deref().unwrap_or(""),
2464 )?,
2465 recall_policy_profile: match ProtoRecallPolicyProfile::try_from(value.recall_policy_profile)
2466 .unwrap_or(ProtoRecallPolicyProfile::Unspecified)
2467 {
2468 ProtoRecallPolicyProfile::General => RecallPolicyProfile::General,
2469 ProtoRecallPolicyProfile::Support => RecallPolicyProfile::Support,
2470 ProtoRecallPolicyProfile::Research => RecallPolicyProfile::Research,
2471 ProtoRecallPolicyProfile::Assistant => RecallPolicyProfile::Assistant,
2472 ProtoRecallPolicyProfile::AutonomousAgent => RecallPolicyProfile::AutonomousAgent,
2473 ProtoRecallPolicyProfile::Unspecified => RecallPolicyProfile::General,
2474 },
2475 embedding_provider_kind: match ProtoEmbeddingProviderKind::try_from(
2476 value.embedding_provider_kind,
2477 )
2478 .unwrap_or(ProtoEmbeddingProviderKind::Unspecified)
2479 {
2480 ProtoEmbeddingProviderKind::Disabled => EmbeddingProviderKind::Disabled,
2481 ProtoEmbeddingProviderKind::DeterministicLocal => {
2482 EmbeddingProviderKind::DeterministicLocal
2483 }
2484 ProtoEmbeddingProviderKind::Unspecified => {
2485 return Err(invalid_argument(
2486 "engine embedding provider kind is required",
2487 ));
2488 }
2489 },
2490 embedding_dimensions: value.embedding_dimensions as usize,
2491 graph_expansion_max_hops: value.graph_expansion_max_hops as u8,
2492 compaction_summarize_after_record_count: value.compaction_summarize_after_record_count
2493 as usize,
2494 compaction_cold_archive_after_days: value.compaction_cold_archive_after_days,
2495 compaction_cold_archive_importance_threshold_per_mille: value
2496 .compaction_cold_archive_importance_threshold_per_mille
2497 as u16,
2498 })
2499}
2500
2501fn recall_planning_trace_to_proto(value: RecallPlanningTrace) -> ProtoRecallPlanningTrace {
2502 ProtoRecallPlanningTrace {
2503 trace_id: value.trace_id,
2504 token_budget_applied: value.token_budget_applied,
2505 candidates: value
2506 .candidates
2507 .into_iter()
2508 .map(recall_trace_candidate_to_proto)
2509 .collect(),
2510 }
2511}
2512
2513fn recall_trace_candidate_to_proto(value: RecallTraceCandidate) -> ProtoRecallTraceCandidate {
2514 ProtoRecallTraceCandidate {
2515 record_id: value.record_id,
2516 kind: record_kind_to_proto(value.kind),
2517 selected: value.selected,
2518 matched_terms: value.matched_terms,
2519 decision_reason: value.decision_reason,
2520 breakdown: Some(recall_score_breakdown_to_proto(value.breakdown)),
2521 selection_rank: value.selection_rank,
2522 selected_channels: value.selected_channels,
2523 filter_reasons: value.filter_reasons,
2524 candidate_sources: value
2525 .candidate_sources
2526 .into_iter()
2527 .map(candidate_source_to_proto)
2528 .collect(),
2529 planner_stage: Some(planner_stage_to_proto(value.planner_stage)),
2530 }
2531}
2532
2533fn recall_score_breakdown_to_proto(
2534 value: mnemara_core::RecallScoreBreakdown,
2535) -> ProtoRecallScoreBreakdown {
2536 ProtoRecallScoreBreakdown {
2537 lexical: value.lexical,
2538 semantic: value.semantic,
2539 graph: value.graph,
2540 temporal: value.temporal,
2541 policy: value.policy,
2542 total: value.total,
2543 metadata: value.metadata,
2544 curation: value.curation,
2545 episodic: value.episodic,
2546 salience: value.salience,
2547 }
2548}
2549
2550fn namespace_stats_to_proto(value: NamespaceStats) -> ProtoNamespaceStats {
2551 ProtoNamespaceStats {
2552 tenant_id: value.tenant_id,
2553 namespace: value.namespace,
2554 active_records: value.active_records,
2555 archived_records: value.archived_records,
2556 deleted_records: value.deleted_records,
2557 suppressed_records: value.suppressed_records,
2558 pinned_records: value.pinned_records,
2559 }
2560}
2561
2562fn maintenance_stats_to_proto(value: MaintenanceStats) -> ProtoMaintenanceStats {
2563 ProtoMaintenanceStats {
2564 duplicate_candidate_groups: value.duplicate_candidate_groups,
2565 duplicate_candidate_records: value.duplicate_candidate_records,
2566 tombstoned_records: value.tombstoned_records,
2567 expired_records: value.expired_records,
2568 stale_idempotency_keys: value.stale_idempotency_keys,
2569 historical_records: value.historical_records,
2570 superseded_records: value.superseded_records,
2571 lineage_links: value.lineage_links,
2572 }
2573}
2574
2575fn portable_record_to_proto(value: PortableRecord) -> ProtoPortableRecord {
2576 ProtoPortableRecord {
2577 record: Some(record_to_proto(value.record)),
2578 idempotency_key: value.idempotency_key,
2579 }
2580}
2581
2582fn snapshot_manifest_to_proto(value: SnapshotManifest) -> SnapshotReply {
2583 SnapshotReply {
2584 snapshot_id: value.snapshot_id,
2585 created_at_unix_ms: value.created_at_unix_ms,
2586 namespaces: value.namespaces,
2587 record_count: value.record_count,
2588 storage_bytes: value.storage_bytes,
2589 engine: Some(engine_tuning_info_to_proto(value.engine)),
2590 }
2591}
2592
2593fn portable_package_to_proto(value: PortableStorePackage) -> ProtoExportReply {
2594 ProtoExportReply {
2595 package_version: value.package_version,
2596 exported_at_unix_ms: value.exported_at_unix_ms,
2597 manifest: Some(snapshot_manifest_to_proto(value.manifest)),
2598 records: value
2599 .records
2600 .into_iter()
2601 .map(portable_record_to_proto)
2602 .collect(),
2603 }
2604}
2605
2606fn import_mode_from_proto(value: i32) -> Result<ImportMode, Status> {
2607 match ProtoImportMode::try_from(value).unwrap_or(ProtoImportMode::Unspecified) {
2608 ProtoImportMode::Validate => Ok(ImportMode::Validate),
2609 ProtoImportMode::Merge => Ok(ImportMode::Merge),
2610 ProtoImportMode::Replace => Ok(ImportMode::Replace),
2611 ProtoImportMode::Unspecified => Err(invalid_argument("import mode is required")),
2612 }
2613}
2614
2615fn portable_record_from_proto(value: ProtoPortableRecord) -> Result<PortableRecord, Status> {
2616 Ok(PortableRecord {
2617 record: record_from_proto(
2618 value
2619 .record
2620 .ok_or_else(|| invalid_argument("portable record payload is required"))?,
2621 )?,
2622 idempotency_key: value.idempotency_key,
2623 })
2624}
2625
2626fn snapshot_manifest_from_proto(value: SnapshotReply) -> Result<SnapshotManifest, Status> {
2627 Ok(SnapshotManifest {
2628 snapshot_id: value.snapshot_id,
2629 created_at_unix_ms: value.created_at_unix_ms,
2630 namespaces: value.namespaces,
2631 record_count: value.record_count,
2632 storage_bytes: value.storage_bytes,
2633 engine: value
2634 .engine
2635 .map(engine_tuning_info_from_proto)
2636 .transpose()?
2637 .ok_or_else(|| invalid_argument("snapshot engine tuning info is required"))?,
2638 })
2639}
2640
2641fn portable_package_from_proto(value: ProtoExportReply) -> Result<PortableStorePackage, Status> {
2642 Ok(PortableStorePackage {
2643 package_version: value.package_version,
2644 exported_at_unix_ms: value.exported_at_unix_ms,
2645 manifest: snapshot_manifest_from_proto(
2646 value
2647 .manifest
2648 .ok_or_else(|| invalid_argument("portable export manifest is required"))?,
2649 )?,
2650 records: value
2651 .records
2652 .into_iter()
2653 .map(portable_record_from_proto)
2654 .collect::<Result<Vec<_>, _>>()?,
2655 })
2656}
2657
2658fn trace_operation_kind_to_proto(value: TraceOperationKind) -> i32 {
2659 match value {
2660 TraceOperationKind::Upsert => ProtoTraceOperationKind::Upsert as i32,
2661 TraceOperationKind::BatchUpsert => ProtoTraceOperationKind::BatchUpsert as i32,
2662 TraceOperationKind::Recall => ProtoTraceOperationKind::Recall as i32,
2663 TraceOperationKind::Snapshot => ProtoTraceOperationKind::Snapshot as i32,
2664 TraceOperationKind::Stats => ProtoTraceOperationKind::Stats as i32,
2665 TraceOperationKind::IntegrityCheck => ProtoTraceOperationKind::IntegrityCheck as i32,
2666 TraceOperationKind::Repair => ProtoTraceOperationKind::Repair as i32,
2667 TraceOperationKind::Compact => ProtoTraceOperationKind::Compact as i32,
2668 TraceOperationKind::Delete => ProtoTraceOperationKind::Delete as i32,
2669 TraceOperationKind::Archive => ProtoTraceOperationKind::Archive as i32,
2670 TraceOperationKind::Suppress => ProtoTraceOperationKind::Suppress as i32,
2671 TraceOperationKind::Recover => ProtoTraceOperationKind::Recover as i32,
2672 TraceOperationKind::Export => ProtoTraceOperationKind::Export as i32,
2673 TraceOperationKind::Import => ProtoTraceOperationKind::Import as i32,
2674 }
2675}
2676
2677fn trace_operation_kind_from_proto(value: ProtoTraceOperationKind) -> Option<TraceOperationKind> {
2678 match value {
2679 ProtoTraceOperationKind::Upsert => Some(TraceOperationKind::Upsert),
2680 ProtoTraceOperationKind::BatchUpsert => Some(TraceOperationKind::BatchUpsert),
2681 ProtoTraceOperationKind::Recall => Some(TraceOperationKind::Recall),
2682 ProtoTraceOperationKind::Snapshot => Some(TraceOperationKind::Snapshot),
2683 ProtoTraceOperationKind::Stats => Some(TraceOperationKind::Stats),
2684 ProtoTraceOperationKind::IntegrityCheck => Some(TraceOperationKind::IntegrityCheck),
2685 ProtoTraceOperationKind::Repair => Some(TraceOperationKind::Repair),
2686 ProtoTraceOperationKind::Compact => Some(TraceOperationKind::Compact),
2687 ProtoTraceOperationKind::Delete => Some(TraceOperationKind::Delete),
2688 ProtoTraceOperationKind::Archive => Some(TraceOperationKind::Archive),
2689 ProtoTraceOperationKind::Suppress => Some(TraceOperationKind::Suppress),
2690 ProtoTraceOperationKind::Recover => Some(TraceOperationKind::Recover),
2691 ProtoTraceOperationKind::Export => Some(TraceOperationKind::Export),
2692 ProtoTraceOperationKind::Import => Some(TraceOperationKind::Import),
2693 ProtoTraceOperationKind::Unspecified => None,
2694 }
2695}
2696
2697fn trace_status_to_proto(value: TraceStatus) -> i32 {
2698 match value {
2699 TraceStatus::Ok => ProtoTraceStatus::Ok as i32,
2700 TraceStatus::Rejected => ProtoTraceStatus::Rejected as i32,
2701 TraceStatus::Error => ProtoTraceStatus::Error as i32,
2702 }
2703}
2704
2705fn trace_status_from_proto(value: ProtoTraceStatus) -> Option<TraceStatus> {
2706 match value {
2707 ProtoTraceStatus::Ok => Some(TraceStatus::Ok),
2708 ProtoTraceStatus::Rejected => Some(TraceStatus::Rejected),
2709 ProtoTraceStatus::Error => Some(TraceStatus::Error),
2710 ProtoTraceStatus::Unspecified => None,
2711 }
2712}
2713
2714fn operation_trace_to_proto(value: OperationTrace) -> ProtoOperationTrace {
2715 ProtoOperationTrace {
2716 trace_id: value.trace_id,
2717 correlation_id: value.correlation_id,
2718 operation: trace_operation_kind_to_proto(value.operation),
2719 transport: value.transport,
2720 backend: value.backend,
2721 admission_class: value.admission_class,
2722 tenant_id: value.tenant_id,
2723 namespace: value.namespace,
2724 principal: value.principal,
2725 store_span_id: value.store_span_id,
2726 planning_trace_id: value.planning_trace_id,
2727 started_at_unix_ms: value.started_at_unix_ms,
2728 completed_at_unix_ms: value.completed_at_unix_ms,
2729 latency_ms: value.latency_ms,
2730 status: trace_status_to_proto(value.status),
2731 status_message: value.status_message,
2732 summary: Some(ProtoOperationTraceSummary {
2733 record_id: value.summary.record_id,
2734 request_count: value.summary.request_count,
2735 query_text: value.summary.query_text,
2736 max_items: value.summary.max_items,
2737 token_budget: value.summary.token_budget,
2738 dry_run: value.summary.dry_run,
2739 }),
2740 recall_explanation: value.recall_explanation.map(recall_explanation_to_proto),
2741 }
2742}
2743
2744fn kinds_from_proto(values: Vec<String>) -> Result<Vec<MemoryRecordKind>, Status> {
2745 values
2746 .into_iter()
2747 .map(|value| record_kind_from_proto(&value))
2748 .collect()
2749}
2750
2751fn trust_levels_from_proto(values: Vec<String>) -> Result<Vec<MemoryTrustLevel>, Status> {
2752 values
2753 .into_iter()
2754 .map(|value| trust_level_from_proto(&value))
2755 .collect()
2756}
2757
2758fn quality_states_from_proto(values: Vec<String>) -> Result<Vec<MemoryQualityState>, Status> {
2759 values
2760 .into_iter()
2761 .map(|value| quality_state_from_proto(&value))
2762 .collect()
2763}
2764
2765fn recall_filters_from_proto(filters: Option<ProtoRecallFilters>) -> Result<RecallFilters, Status> {
2766 let Some(filters) = filters else {
2767 return Ok(RecallFilters::default());
2768 };
2769
2770 Ok(RecallFilters {
2771 kinds: kinds_from_proto(filters.kinds)?,
2772 required_labels: filters.required_labels,
2773 source: filters.source,
2774 from_unix_ms: filters.from_unix_ms,
2775 to_unix_ms: filters.to_unix_ms,
2776 min_importance_score: filters.min_importance_score,
2777 trust_levels: trust_levels_from_proto(filters.trust_levels)?,
2778 states: quality_states_from_proto(filters.states)?,
2779 include_archived: filters.include_archived,
2780 episode_id: filters.episode_id,
2781 continuity_states: filters
2782 .continuity_states
2783 .into_iter()
2784 .map(|value| continuity_state_from_proto(&value))
2785 .collect::<Result<Vec<_>, _>>()?,
2786 unresolved_only: filters.unresolved_only,
2787 temporal_order: temporal_order_from_proto(filters.temporal_order.as_deref().unwrap_or(""))?,
2788 historical_mode: historical_mode_from_proto(
2789 filters.historical_mode.as_deref().unwrap_or(""),
2790 )?,
2791 lineage_record_id: filters.lineage_record_id,
2792 })
2793}
2794
2795fn validate_scope_limits(scope: &MemoryScope, limits: &ServerLimits) -> Result<(), Status> {
2796 if scope.labels.len() > limits.max_labels_per_scope {
2797 return Err(invalid_argument(format!(
2798 "scope labels exceed configured max of {}",
2799 limits.max_labels_per_scope
2800 )));
2801 }
2802 Ok(())
2803}
2804
2805fn validate_record_limits(record: &MemoryRecord, limits: &ServerLimits) -> Result<(), Status> {
2806 validate_scope_limits(&record.scope, limits)?;
2807 if record.content.len() > limits.max_record_content_bytes {
2808 return Err(invalid_argument(format!(
2809 "record content exceeds configured max of {} bytes",
2810 limits.max_record_content_bytes
2811 )));
2812 }
2813 if record.summary.as_deref().map(str::len).unwrap_or(0) > limits.max_query_text_bytes {
2814 return Err(invalid_argument(format!(
2815 "record summary exceeds configured max of {} bytes",
2816 limits.max_query_text_bytes
2817 )));
2818 }
2819 Ok(())
2820}
2821
2822fn validate_recall_limits(query: &RecallQuery, limits: &ServerLimits) -> Result<(), Status> {
2823 validate_scope_limits(&query.scope, limits)?;
2824 if query.max_items == 0 {
2825 return Err(invalid_argument("recall max_items must be greater than 0"));
2826 }
2827 if query.max_items > limits.max_recall_items {
2828 return Err(invalid_argument(format!(
2829 "recall max_items exceeds configured max of {}",
2830 limits.max_recall_items
2831 )));
2832 }
2833 if query.query_text.len() > limits.max_query_text_bytes {
2834 return Err(invalid_argument(format!(
2835 "query text exceeds configured max of {} bytes",
2836 limits.max_query_text_bytes
2837 )));
2838 }
2839 Ok(())
2840}
2841
2842fn map_store_error(error: mnemara_core::Error) -> Status {
2843 match error {
2844 mnemara_core::Error::InvalidConfig(message)
2845 | mnemara_core::Error::InvalidRequest(message) => invalid_argument(message),
2846 mnemara_core::Error::Conflict(message) => Status::already_exists(message),
2847 mnemara_core::Error::Unsupported(message) => Status::unimplemented(message),
2848 mnemara_core::Error::Backend(message) => internal_status(message),
2849 }
2850}
2851
2852fn map_http_store_error(error: mnemara_core::Error) -> (StatusCode, Json<HttpErrorBody>) {
2853 let status = match error {
2854 mnemara_core::Error::InvalidConfig(_) | mnemara_core::Error::InvalidRequest(_) => {
2855 StatusCode::BAD_REQUEST
2856 }
2857 mnemara_core::Error::Conflict(_) => StatusCode::CONFLICT,
2858 mnemara_core::Error::Unsupported(_) => StatusCode::NOT_IMPLEMENTED,
2859 mnemara_core::Error::Backend(_) => StatusCode::INTERNAL_SERVER_ERROR,
2860 };
2861 (
2862 status,
2863 Json(HttpErrorBody {
2864 error: error.to_string(),
2865 }),
2866 )
2867}
2868
2869fn record_grpc_result<T>(
2870 metrics: &MethodMetrics,
2871 result: Result<Response<T>, Status>,
2872) -> Result<Response<T>, Status> {
2873 match result {
2874 Ok(response) => {
2875 metrics.record_status(&Status::ok(""));
2876 Ok(response)
2877 }
2878 Err(status) => {
2879 metrics.record_status(&status);
2880 Err(status)
2881 }
2882 }
2883}
2884
2885#[tonic::async_trait]
2886impl<S> MemoryService for GrpcMemoryService<S>
2887where
2888 S: MemoryStore + 'static,
2889{
2890 async fn upsert_memory_record(
2891 &self,
2892 request: Request<ProtoUpsertMemoryRecordRequest>,
2893 ) -> Result<Response<UpsertMemoryRecordReply>, Status> {
2894 self.runtime.metrics().grpc_upsert.record_started();
2895 validate_grpc_auth(
2896 &request,
2897 self.runtime.auth().as_ref(),
2898 AuthPermission::Write,
2899 )?;
2900 let principal = grpc_principal(&request);
2901 let started_at_unix_ms = now_unix_ms();
2902 let correlation_id = self.runtime.traces().next_id("corr");
2903 let result = async {
2904 let request = request.into_inner();
2905 let record = record_from_proto(
2906 request
2907 .record
2908 .ok_or_else(|| invalid_argument("memory record is required"))?,
2909 )?;
2910 let tenant_id = record.scope.tenant_id.clone();
2911 let namespace = record.scope.namespace.clone();
2912 validate_record_limits(&record, self.runtime.limits().as_ref())?;
2913 let _permit = self
2914 .runtime
2915 .admission()
2916 .acquire(AdmissionClass::Write, Some(&tenant_id))
2917 .await
2918 .map_err(|error| {
2919 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
2920 })?;
2921 let receipt = self
2922 .store
2923 .upsert(mnemara_core::UpsertRequest {
2924 record,
2925 idempotency_key: request.idempotency_key,
2926 })
2927 .await
2928 .map_err(map_store_error)?;
2929 record_trace(
2930 &self.runtime,
2931 TraceOperationKind::Upsert,
2932 "grpc",
2933 Some(tenant_id),
2934 Some(namespace),
2935 principal,
2936 started_at_unix_ms,
2937 TraceStatus::Ok,
2938 None,
2939 OperationTraceSummary {
2940 record_id: Some(receipt.record_id.clone()),
2941 ..OperationTraceSummary::default()
2942 },
2943 None,
2944 correlation_id,
2945 );
2946
2947 Ok(Response::new(UpsertMemoryRecordReply {
2948 record_id: receipt.record_id,
2949 deduplicated: receipt.deduplicated,
2950 summary_refreshed: receipt.summary_refreshed,
2951 }))
2952 }
2953 .await;
2954 record_grpc_result(&self.runtime.metrics().grpc_upsert, result)
2955 }
2956
2957 async fn batch_upsert_memory_records(
2958 &self,
2959 request: Request<BatchUpsertMemoryRecordsRequest>,
2960 ) -> Result<Response<BatchUpsertMemoryRecordsReply>, Status> {
2961 self.runtime.metrics().grpc_batch_upsert.record_started();
2962 validate_grpc_auth(
2963 &request,
2964 self.runtime.auth().as_ref(),
2965 AuthPermission::Write,
2966 )?;
2967 let principal = grpc_principal(&request);
2968 let started_at_unix_ms = now_unix_ms();
2969 let correlation_id = self.runtime.traces().next_id("corr");
2970 let result = async {
2971 let request = request.into_inner();
2972 if request.requests.len() > self.runtime.limits().max_batch_upsert_requests {
2973 return Err(invalid_argument(format!(
2974 "batch upsert request count exceeds configured max of {}",
2975 self.runtime.limits().max_batch_upsert_requests
2976 )));
2977 }
2978 let requests = request
2979 .requests
2980 .into_iter()
2981 .map(|request| {
2982 let record = record_from_proto(
2983 request
2984 .record
2985 .ok_or_else(|| invalid_argument("memory record is required"))?,
2986 )?;
2987 validate_record_limits(&record, self.runtime.limits().as_ref())?;
2988 Ok(mnemara_core::UpsertRequest {
2989 record,
2990 idempotency_key: request.idempotency_key,
2991 })
2992 })
2993 .collect::<Result<Vec<_>, Status>>()?;
2994 let tenant_id = requests
2995 .first()
2996 .map(|item| item.record.scope.tenant_id.clone());
2997 let namespace = requests
2998 .first()
2999 .map(|item| item.record.scope.namespace.clone());
3000 let _permit = self
3001 .runtime
3002 .admission()
3003 .acquire(AdmissionClass::Write, tenant_id.as_deref())
3004 .await
3005 .map_err(|error| {
3006 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3007 })?;
3008
3009 let receipts: Vec<UpsertMemoryRecordReply> = self
3010 .store
3011 .batch_upsert(BatchUpsertRequest { requests })
3012 .await
3013 .map_err(map_store_error)?
3014 .into_iter()
3015 .map(|receipt| UpsertMemoryRecordReply {
3016 record_id: receipt.record_id,
3017 deduplicated: receipt.deduplicated,
3018 summary_refreshed: receipt.summary_refreshed,
3019 })
3020 .collect();
3021 record_trace(
3022 &self.runtime,
3023 TraceOperationKind::BatchUpsert,
3024 "grpc",
3025 tenant_id,
3026 namespace,
3027 principal,
3028 started_at_unix_ms,
3029 TraceStatus::Ok,
3030 None,
3031 OperationTraceSummary {
3032 request_count: Some(receipts.len() as u32),
3033 ..OperationTraceSummary::default()
3034 },
3035 None,
3036 correlation_id,
3037 );
3038
3039 Ok(Response::new(BatchUpsertMemoryRecordsReply { receipts }))
3040 }
3041 .await;
3042 record_grpc_result(&self.runtime.metrics().grpc_batch_upsert, result)
3043 }
3044
3045 async fn recall(
3046 &self,
3047 request: Request<ProtoRecallRequest>,
3048 ) -> Result<Response<RecallReply>, Status> {
3049 self.runtime.metrics().grpc_recall.record_started();
3050 validate_grpc_auth(&request, self.runtime.auth().as_ref(), AuthPermission::Read)?;
3051 let principal = grpc_principal(&request);
3052 let started_at_unix_ms = now_unix_ms();
3053 let correlation_id = self.runtime.traces().next_id("corr");
3054 let result = async {
3055 let request = request.into_inner();
3056 let query = RecallQuery {
3057 scope: scope_from_proto(
3058 request
3059 .scope
3060 .ok_or_else(|| invalid_argument("recall scope is required"))?,
3061 )?,
3062 query_text: request.query_text,
3063 max_items: request.max_items as usize,
3064 token_budget: request.token_budget.map(|value| value as usize),
3065 filters: recall_filters_from_proto(request.filters)?,
3066 include_explanation: request.include_explanation,
3067 };
3068 validate_recall_limits(&query, self.runtime.limits().as_ref())?;
3069 let tenant_id = query.scope.tenant_id.clone();
3070 let namespace = query.scope.namespace.clone();
3071 let query_text = query.query_text.clone();
3072 let max_items = query.max_items as u32;
3073 let token_budget = query.token_budget.map(|value| value as u32);
3074 let _permit = self
3075 .runtime
3076 .admission()
3077 .acquire(AdmissionClass::Read, Some(&tenant_id))
3078 .await
3079 .map_err(|error| {
3080 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3081 })?;
3082 let mut result = self.store.recall(query).await.map_err(map_store_error)?;
3083 attach_correlation_id(&mut result, &correlation_id);
3084 let recall_explanation = result.explanation.clone();
3085
3086 let hits = result
3087 .hits
3088 .into_iter()
3089 .map(|hit| ProtoRecallHit {
3090 record: Some(record_to_proto(hit.record)),
3091 breakdown: Some(recall_score_breakdown_to_proto(hit.breakdown)),
3092 selected_channels: hit
3093 .explanation
3094 .as_ref()
3095 .map(|value| value.selected_channels.clone())
3096 .unwrap_or_default(),
3097 policy_notes: hit
3098 .explanation
3099 .as_ref()
3100 .map(|value| value.policy_notes.clone())
3101 .unwrap_or_default(),
3102 explanation: hit.explanation.map(recall_explanation_to_proto),
3103 })
3104 .collect();
3105 record_trace(
3106 &self.runtime,
3107 TraceOperationKind::Recall,
3108 "grpc",
3109 Some(tenant_id),
3110 Some(namespace),
3111 principal,
3112 started_at_unix_ms,
3113 TraceStatus::Ok,
3114 None,
3115 OperationTraceSummary {
3116 query_text: Some(query_text),
3117 max_items: Some(max_items),
3118 token_budget,
3119 ..OperationTraceSummary::default()
3120 },
3121 recall_explanation,
3122 correlation_id,
3123 );
3124
3125 Ok(Response::new(RecallReply {
3126 hits,
3127 total_candidates_examined: result.total_candidates_examined as u32,
3128 explanation: result.explanation.map(recall_explanation_to_proto),
3129 }))
3130 }
3131 .await;
3132 record_grpc_result(&self.runtime.metrics().grpc_recall, result)
3133 }
3134
3135 async fn compact(
3136 &self,
3137 request: Request<ProtoCompactRequest>,
3138 ) -> Result<Response<CompactReply>, Status> {
3139 self.runtime.metrics().grpc_compact.record_started();
3140 validate_grpc_auth(
3141 &request,
3142 self.runtime.auth().as_ref(),
3143 AuthPermission::Admin,
3144 )?;
3145 let principal = grpc_principal(&request);
3146 let started_at_unix_ms = now_unix_ms();
3147 let correlation_id = self.runtime.traces().next_id("corr");
3148 let result = async {
3149 let request = request.into_inner();
3150 let tenant_id = request.tenant_id.clone();
3151 let namespace = request.namespace.clone();
3152 let _permit = self
3153 .runtime
3154 .admission()
3155 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3156 .await
3157 .map_err(|error| {
3158 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3159 })?;
3160 let report = self
3161 .store
3162 .compact(CompactionRequest {
3163 tenant_id,
3164 namespace: namespace.clone(),
3165 dry_run: request.dry_run,
3166 reason: request.reason,
3167 })
3168 .await
3169 .map_err(map_store_error)?;
3170 record_trace(
3171 &self.runtime,
3172 TraceOperationKind::Compact,
3173 "grpc",
3174 Some(request.tenant_id),
3175 namespace,
3176 principal,
3177 started_at_unix_ms,
3178 TraceStatus::Ok,
3179 None,
3180 OperationTraceSummary {
3181 dry_run: Some(request.dry_run),
3182 ..OperationTraceSummary::default()
3183 },
3184 None,
3185 correlation_id,
3186 );
3187
3188 Ok(Response::new(CompactReply {
3189 deduplicated_records: report.deduplicated_records,
3190 archived_records: report.archived_records,
3191 summarized_clusters: report.summarized_clusters,
3192 pruned_graph_edges: report.pruned_graph_edges,
3193 superseded_records: report.superseded_records,
3194 lineage_links_created: report.lineage_links_created,
3195 dry_run: report.dry_run,
3196 }))
3197 }
3198 .await;
3199 record_grpc_result(&self.runtime.metrics().grpc_compact, result)
3200 }
3201
3202 async fn snapshot(
3203 &self,
3204 request: Request<SnapshotRequest>,
3205 ) -> Result<Response<SnapshotReply>, Status> {
3206 self.runtime.metrics().grpc_snapshot.record_started();
3207 validate_grpc_auth(
3208 &request,
3209 self.runtime.auth().as_ref(),
3210 AuthPermission::Admin,
3211 )?;
3212 let principal = grpc_principal(&request);
3213 let started_at_unix_ms = now_unix_ms();
3214 let correlation_id = self.runtime.traces().next_id("corr");
3215 let result = async {
3216 let _permit = self
3217 .runtime
3218 .admission()
3219 .acquire(AdmissionClass::Admin, None)
3220 .await
3221 .map_err(|error| {
3222 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3223 })?;
3224 let manifest = self.store.snapshot().await.map_err(map_store_error)?;
3225 record_trace(
3226 &self.runtime,
3227 TraceOperationKind::Snapshot,
3228 "grpc",
3229 None,
3230 None,
3231 principal,
3232 started_at_unix_ms,
3233 TraceStatus::Ok,
3234 None,
3235 OperationTraceSummary::default(),
3236 None,
3237 correlation_id,
3238 );
3239 Ok(Response::new(SnapshotReply {
3240 snapshot_id: manifest.snapshot_id,
3241 created_at_unix_ms: manifest.created_at_unix_ms,
3242 namespaces: manifest.namespaces,
3243 record_count: manifest.record_count,
3244 storage_bytes: manifest.storage_bytes,
3245 engine: Some(engine_tuning_info_to_proto(manifest.engine)),
3246 }))
3247 }
3248 .await;
3249 record_grpc_result(&self.runtime.metrics().grpc_snapshot, result)
3250 }
3251
3252 async fn delete(
3253 &self,
3254 request: Request<ProtoDeleteRequest>,
3255 ) -> Result<Response<DeleteReply>, Status> {
3256 self.runtime.metrics().grpc_delete.record_started();
3257 validate_grpc_auth(
3258 &request,
3259 self.runtime.auth().as_ref(),
3260 AuthPermission::Admin,
3261 )?;
3262 let principal = grpc_principal(&request);
3263 let started_at_unix_ms = now_unix_ms();
3264 let correlation_id = self.runtime.traces().next_id("corr");
3265 let result = async {
3266 let request = request.into_inner();
3267 let tenant_id = request.tenant_id.clone();
3268 let namespace = request.namespace.clone();
3269 let _permit = self
3270 .runtime
3271 .admission()
3272 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3273 .await
3274 .map_err(|error| {
3275 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3276 })?;
3277 let receipt = self
3278 .store
3279 .delete(DeleteRequest {
3280 tenant_id,
3281 namespace: namespace.clone(),
3282 record_id: request.record_id,
3283 hard_delete: request.hard_delete,
3284 audit_reason: request.audit_reason,
3285 })
3286 .await
3287 .map_err(map_store_error)?;
3288 record_trace(
3289 &self.runtime,
3290 TraceOperationKind::Delete,
3291 "grpc",
3292 Some(request.tenant_id),
3293 Some(namespace),
3294 principal,
3295 started_at_unix_ms,
3296 TraceStatus::Ok,
3297 None,
3298 OperationTraceSummary {
3299 record_id: Some(receipt.record_id.clone()),
3300 ..OperationTraceSummary::default()
3301 },
3302 None,
3303 correlation_id,
3304 );
3305
3306 Ok(Response::new(DeleteReply {
3307 record_id: receipt.record_id,
3308 tombstoned: receipt.tombstoned,
3309 hard_deleted: receipt.hard_deleted,
3310 }))
3311 }
3312 .await;
3313 record_grpc_result(&self.runtime.metrics().grpc_delete, result)
3314 }
3315
3316 async fn archive(
3317 &self,
3318 request: Request<ProtoArchiveRequest>,
3319 ) -> Result<Response<ArchiveReply>, Status> {
3320 self.runtime.metrics().grpc_archive.record_started();
3321 validate_grpc_auth(
3322 &request,
3323 self.runtime.auth().as_ref(),
3324 AuthPermission::Admin,
3325 )?;
3326 let principal = grpc_principal(&request);
3327 let started_at_unix_ms = now_unix_ms();
3328 let correlation_id = self.runtime.traces().next_id("corr");
3329 let result = async {
3330 let request = request.into_inner();
3331 let tenant_id = request.tenant_id.clone();
3332 let namespace = request.namespace.clone();
3333 let _permit = self
3334 .runtime
3335 .admission()
3336 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3337 .await
3338 .map_err(|error| {
3339 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3340 })?;
3341 let receipt = self
3342 .store
3343 .archive(ArchiveRequest {
3344 tenant_id,
3345 namespace: namespace.clone(),
3346 record_id: request.record_id,
3347 dry_run: request.dry_run,
3348 audit_reason: request.audit_reason,
3349 })
3350 .await
3351 .map_err(map_store_error)?;
3352 record_trace(
3353 &self.runtime,
3354 TraceOperationKind::Archive,
3355 "grpc",
3356 Some(request.tenant_id),
3357 Some(namespace),
3358 principal,
3359 started_at_unix_ms,
3360 TraceStatus::Ok,
3361 None,
3362 OperationTraceSummary {
3363 record_id: Some(receipt.record_id.clone()),
3364 dry_run: Some(receipt.dry_run),
3365 ..OperationTraceSummary::default()
3366 },
3367 None,
3368 correlation_id,
3369 );
3370
3371 Ok(Response::new(ArchiveReply {
3372 record_id: receipt.record_id,
3373 previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3374 previous_historical_state: historical_state_to_proto(
3375 receipt.previous_historical_state,
3376 ),
3377 quality_state: quality_state_to_proto(receipt.quality_state),
3378 historical_state: historical_state_to_proto(receipt.historical_state),
3379 changed: receipt.changed,
3380 dry_run: receipt.dry_run,
3381 }))
3382 }
3383 .await;
3384 record_grpc_result(&self.runtime.metrics().grpc_archive, result)
3385 }
3386
3387 async fn suppress(
3388 &self,
3389 request: Request<ProtoSuppressRequest>,
3390 ) -> Result<Response<SuppressReply>, Status> {
3391 self.runtime.metrics().grpc_suppress.record_started();
3392 validate_grpc_auth(
3393 &request,
3394 self.runtime.auth().as_ref(),
3395 AuthPermission::Admin,
3396 )?;
3397 let principal = grpc_principal(&request);
3398 let started_at_unix_ms = now_unix_ms();
3399 let correlation_id = self.runtime.traces().next_id("corr");
3400 let result = async {
3401 let request = request.into_inner();
3402 let tenant_id = request.tenant_id.clone();
3403 let namespace = request.namespace.clone();
3404 let _permit = self
3405 .runtime
3406 .admission()
3407 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3408 .await
3409 .map_err(|error| {
3410 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3411 })?;
3412 let receipt = self
3413 .store
3414 .suppress(SuppressRequest {
3415 tenant_id,
3416 namespace: namespace.clone(),
3417 record_id: request.record_id,
3418 dry_run: request.dry_run,
3419 audit_reason: request.audit_reason,
3420 })
3421 .await
3422 .map_err(map_store_error)?;
3423 record_trace(
3424 &self.runtime,
3425 TraceOperationKind::Suppress,
3426 "grpc",
3427 Some(request.tenant_id),
3428 Some(namespace),
3429 principal,
3430 started_at_unix_ms,
3431 TraceStatus::Ok,
3432 None,
3433 OperationTraceSummary {
3434 record_id: Some(receipt.record_id.clone()),
3435 dry_run: Some(receipt.dry_run),
3436 ..OperationTraceSummary::default()
3437 },
3438 None,
3439 correlation_id,
3440 );
3441
3442 Ok(Response::new(SuppressReply {
3443 record_id: receipt.record_id,
3444 previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3445 previous_historical_state: historical_state_to_proto(
3446 receipt.previous_historical_state,
3447 ),
3448 quality_state: quality_state_to_proto(receipt.quality_state),
3449 historical_state: historical_state_to_proto(receipt.historical_state),
3450 changed: receipt.changed,
3451 dry_run: receipt.dry_run,
3452 }))
3453 }
3454 .await;
3455 record_grpc_result(&self.runtime.metrics().grpc_suppress, result)
3456 }
3457
3458 async fn recover(
3459 &self,
3460 request: Request<ProtoRecoverRequest>,
3461 ) -> Result<Response<RecoverReply>, Status> {
3462 self.runtime.metrics().grpc_recover.record_started();
3463 validate_grpc_auth(
3464 &request,
3465 self.runtime.auth().as_ref(),
3466 AuthPermission::Admin,
3467 )?;
3468 let principal = grpc_principal(&request);
3469 let started_at_unix_ms = now_unix_ms();
3470 let correlation_id = self.runtime.traces().next_id("corr");
3471 let result = async {
3472 let request = request.into_inner();
3473 let tenant_id = request.tenant_id.clone();
3474 let namespace = request.namespace.clone();
3475 let _permit = self
3476 .runtime
3477 .admission()
3478 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3479 .await
3480 .map_err(|error| {
3481 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3482 })?;
3483 let receipt = self
3484 .store
3485 .recover(RecoverRequest {
3486 tenant_id,
3487 namespace: namespace.clone(),
3488 record_id: request.record_id,
3489 dry_run: request.dry_run,
3490 audit_reason: request.audit_reason,
3491 quality_state: quality_state_from_proto(&request.quality_state)?,
3492 historical_state: request
3493 .historical_state
3494 .as_deref()
3495 .map(historical_state_from_proto)
3496 .transpose()?,
3497 })
3498 .await
3499 .map_err(map_store_error)?;
3500 record_trace(
3501 &self.runtime,
3502 TraceOperationKind::Recover,
3503 "grpc",
3504 Some(request.tenant_id),
3505 Some(namespace),
3506 principal,
3507 started_at_unix_ms,
3508 TraceStatus::Ok,
3509 None,
3510 OperationTraceSummary {
3511 record_id: Some(receipt.record_id.clone()),
3512 dry_run: Some(receipt.dry_run),
3513 ..OperationTraceSummary::default()
3514 },
3515 None,
3516 correlation_id,
3517 );
3518
3519 Ok(Response::new(RecoverReply {
3520 record_id: receipt.record_id,
3521 previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3522 previous_historical_state: historical_state_to_proto(
3523 receipt.previous_historical_state,
3524 ),
3525 quality_state: quality_state_to_proto(receipt.quality_state),
3526 historical_state: historical_state_to_proto(receipt.historical_state),
3527 changed: receipt.changed,
3528 dry_run: receipt.dry_run,
3529 }))
3530 }
3531 .await;
3532 record_grpc_result(&self.runtime.metrics().grpc_recover, result)
3533 }
3534
3535 async fn stats(
3536 &self,
3537 request: Request<ProtoStoreStatsRequest>,
3538 ) -> Result<Response<ProtoStoreStatsReply>, Status> {
3539 self.runtime.metrics().grpc_stats.record_started();
3540 validate_grpc_auth(
3541 &request,
3542 self.runtime.auth().as_ref(),
3543 AuthPermission::Admin,
3544 )?;
3545 let principal = grpc_principal(&request);
3546 let started_at_unix_ms = now_unix_ms();
3547 let correlation_id = self.runtime.traces().next_id("corr");
3548 let result = async {
3549 let request = request.into_inner();
3550 let tenant_id = request.tenant_id.clone();
3551 let namespace = request.namespace.clone();
3552 let _permit = self
3553 .runtime
3554 .admission()
3555 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3556 .await
3557 .map_err(|error| {
3558 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3559 })?;
3560 let report = self
3561 .store
3562 .stats(StoreStatsRequest {
3563 tenant_id: tenant_id.clone(),
3564 namespace: namespace.clone(),
3565 })
3566 .await
3567 .map_err(map_store_error)?;
3568 record_trace(
3569 &self.runtime,
3570 TraceOperationKind::Stats,
3571 "grpc",
3572 tenant_id,
3573 namespace,
3574 principal,
3575 started_at_unix_ms,
3576 TraceStatus::Ok,
3577 None,
3578 OperationTraceSummary::default(),
3579 None,
3580 correlation_id,
3581 );
3582
3583 Ok(Response::new(ProtoStoreStatsReply {
3584 generated_at_unix_ms: report.generated_at_unix_ms,
3585 total_records: report.total_records,
3586 storage_bytes: report.storage_bytes,
3587 namespaces: report
3588 .namespaces
3589 .into_iter()
3590 .map(namespace_stats_to_proto)
3591 .collect(),
3592 maintenance: Some(maintenance_stats_to_proto(report.maintenance)),
3593 engine: Some(engine_tuning_info_to_proto(report.engine)),
3594 }))
3595 }
3596 .await;
3597 record_grpc_result(&self.runtime.metrics().grpc_stats, result)
3598 }
3599
3600 async fn integrity_check(
3601 &self,
3602 request: Request<ProtoIntegrityCheckRequest>,
3603 ) -> Result<Response<ProtoIntegrityCheckReply>, Status> {
3604 self.runtime.metrics().grpc_integrity.record_started();
3605 validate_grpc_auth(
3606 &request,
3607 self.runtime.auth().as_ref(),
3608 AuthPermission::Admin,
3609 )?;
3610 let principal = grpc_principal(&request);
3611 let started_at_unix_ms = now_unix_ms();
3612 let correlation_id = self.runtime.traces().next_id("corr");
3613 let result = async {
3614 let request = request.into_inner();
3615 let tenant_id = request.tenant_id.clone();
3616 let namespace = request.namespace.clone();
3617 let _permit = self
3618 .runtime
3619 .admission()
3620 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3621 .await
3622 .map_err(|error| {
3623 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3624 })?;
3625 let report = self
3626 .store
3627 .integrity_check(IntegrityCheckRequest {
3628 tenant_id: tenant_id.clone(),
3629 namespace: namespace.clone(),
3630 })
3631 .await
3632 .map_err(map_store_error)?;
3633 record_trace(
3634 &self.runtime,
3635 TraceOperationKind::IntegrityCheck,
3636 "grpc",
3637 tenant_id,
3638 namespace,
3639 principal,
3640 started_at_unix_ms,
3641 TraceStatus::Ok,
3642 None,
3643 OperationTraceSummary::default(),
3644 None,
3645 correlation_id,
3646 );
3647
3648 Ok(Response::new(ProtoIntegrityCheckReply {
3649 generated_at_unix_ms: report.generated_at_unix_ms,
3650 healthy: report.healthy,
3651 scanned_records: report.scanned_records,
3652 scanned_idempotency_keys: report.scanned_idempotency_keys,
3653 stale_idempotency_keys: report.stale_idempotency_keys,
3654 missing_idempotency_keys: report.missing_idempotency_keys,
3655 duplicate_active_records: report.duplicate_active_records,
3656 }))
3657 }
3658 .await;
3659 record_grpc_result(&self.runtime.metrics().grpc_integrity, result)
3660 }
3661
3662 async fn repair(
3663 &self,
3664 request: Request<ProtoRepairRequest>,
3665 ) -> Result<Response<ProtoRepairReply>, Status> {
3666 self.runtime.metrics().grpc_repair.record_started();
3667 validate_grpc_auth(
3668 &request,
3669 self.runtime.auth().as_ref(),
3670 AuthPermission::Admin,
3671 )?;
3672 let principal = grpc_principal(&request);
3673 let started_at_unix_ms = now_unix_ms();
3674 let correlation_id = self.runtime.traces().next_id("corr");
3675 let result = async {
3676 let request = request.into_inner();
3677 let tenant_id = request.tenant_id.clone();
3678 let namespace = request.namespace.clone();
3679 let _permit = self
3680 .runtime
3681 .admission()
3682 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3683 .await
3684 .map_err(|error| {
3685 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3686 })?;
3687 let report = self
3688 .store
3689 .repair(RepairRequest {
3690 tenant_id: tenant_id.clone(),
3691 namespace: namespace.clone(),
3692 dry_run: request.dry_run,
3693 reason: request.reason,
3694 remove_stale_idempotency_keys: request.remove_stale_idempotency_keys,
3695 rebuild_missing_idempotency_keys: request.rebuild_missing_idempotency_keys,
3696 })
3697 .await
3698 .map_err(map_store_error)?;
3699 record_trace(
3700 &self.runtime,
3701 TraceOperationKind::Repair,
3702 "grpc",
3703 tenant_id,
3704 namespace,
3705 principal,
3706 started_at_unix_ms,
3707 TraceStatus::Ok,
3708 None,
3709 OperationTraceSummary {
3710 dry_run: Some(request.dry_run),
3711 ..OperationTraceSummary::default()
3712 },
3713 None,
3714 correlation_id,
3715 );
3716
3717 Ok(Response::new(ProtoRepairReply {
3718 dry_run: report.dry_run,
3719 scanned_records: report.scanned_records,
3720 scanned_idempotency_keys: report.scanned_idempotency_keys,
3721 removed_stale_idempotency_keys: report.removed_stale_idempotency_keys,
3722 rebuilt_missing_idempotency_keys: report.rebuilt_missing_idempotency_keys,
3723 healthy_after: report.healthy_after,
3724 }))
3725 }
3726 .await;
3727 record_grpc_result(&self.runtime.metrics().grpc_repair, result)
3728 }
3729
3730 async fn list_traces(
3731 &self,
3732 request: Request<ProtoListTracesRequest>,
3733 ) -> Result<Response<ProtoListTracesReply>, Status> {
3734 validate_grpc_auth(
3735 &request,
3736 self.runtime.auth().as_ref(),
3737 AuthPermission::Admin,
3738 )?;
3739 let request = request.into_inner();
3740 self.runtime
3741 .metrics()
3742 .trace_reads
3743 .fetch_add(1, Ordering::Relaxed);
3744 let traces = self.runtime.traces().list(&TraceListRequest {
3745 tenant_id: request.tenant_id,
3746 namespace: request.namespace,
3747 operation: ProtoTraceOperationKind::try_from(request.operation)
3748 .ok()
3749 .and_then(trace_operation_kind_from_proto),
3750 status: ProtoTraceStatus::try_from(request.status)
3751 .ok()
3752 .and_then(trace_status_from_proto),
3753 before_started_at_unix_ms: request.before_started_at_unix_ms,
3754 limit: request.limit.map(|value| value as usize),
3755 });
3756 Ok(Response::new(ProtoListTracesReply {
3757 traces: traces.into_iter().map(operation_trace_to_proto).collect(),
3758 }))
3759 }
3760
3761 async fn get_trace(
3762 &self,
3763 request: Request<ProtoGetTraceRequest>,
3764 ) -> Result<Response<ProtoOperationTrace>, Status> {
3765 validate_grpc_auth(
3766 &request,
3767 self.runtime.auth().as_ref(),
3768 AuthPermission::Admin,
3769 )?;
3770 let trace_id = request.into_inner().trace_id;
3771 self.runtime
3772 .metrics()
3773 .trace_reads
3774 .fetch_add(1, Ordering::Relaxed);
3775 let trace = self
3776 .runtime
3777 .traces()
3778 .get(&trace_id)
3779 .ok_or_else(|| Status::not_found(format!("trace {trace_id} not found")))?;
3780 Ok(Response::new(operation_trace_to_proto(trace)))
3781 }
3782
3783 async fn export(
3784 &self,
3785 request: Request<ProtoExportRequest>,
3786 ) -> Result<Response<ProtoExportReply>, Status> {
3787 validate_grpc_auth(
3788 &request,
3789 self.runtime.auth().as_ref(),
3790 AuthPermission::Admin,
3791 )?;
3792 let principal = grpc_principal(&request);
3793 let started_at_unix_ms = now_unix_ms();
3794 let correlation_id = self.runtime.traces().next_id("corr");
3795 let request = request.into_inner();
3796 let export_request = ExportRequest {
3797 tenant_id: request.tenant_id,
3798 namespace: request.namespace,
3799 include_archived: request.include_archived,
3800 };
3801 let _permit = self
3802 .runtime
3803 .admission()
3804 .acquire(AdmissionClass::Admin, export_request.tenant_id.as_deref())
3805 .await
3806 .map_err(|error| map_grpc_admission_error(self.runtime.metrics().as_ref(), error))?;
3807 let package = self
3808 .store
3809 .export(export_request.clone())
3810 .await
3811 .map_err(map_store_error)?;
3812 record_trace(
3813 &self.runtime,
3814 TraceOperationKind::Export,
3815 "grpc",
3816 export_request.tenant_id,
3817 export_request.namespace,
3818 principal,
3819 started_at_unix_ms,
3820 TraceStatus::Ok,
3821 None,
3822 OperationTraceSummary::default(),
3823 None,
3824 correlation_id,
3825 );
3826 Ok(Response::new(portable_package_to_proto(package)))
3827 }
3828
3829 async fn import(
3830 &self,
3831 request: Request<ProtoImportRequest>,
3832 ) -> Result<Response<ProtoImportReply>, Status> {
3833 validate_grpc_auth(
3834 &request,
3835 self.runtime.auth().as_ref(),
3836 AuthPermission::Admin,
3837 )?;
3838 let principal = grpc_principal(&request);
3839 let started_at_unix_ms = now_unix_ms();
3840 let correlation_id = self.runtime.traces().next_id("corr");
3841 let request = request.into_inner();
3842 let import_request = ImportRequest {
3843 package: portable_package_from_proto(
3844 request
3845 .package
3846 .ok_or_else(|| invalid_argument("portable package is required"))?,
3847 )?,
3848 mode: import_mode_from_proto(request.mode)?,
3849 dry_run: request.dry_run,
3850 };
3851 let tenant_id = import_request
3852 .package
3853 .records
3854 .first()
3855 .map(|entry| entry.record.scope.tenant_id.clone());
3856 let namespace = import_request
3857 .package
3858 .records
3859 .first()
3860 .map(|entry| entry.record.scope.namespace.clone());
3861 let _permit = self
3862 .runtime
3863 .admission()
3864 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3865 .await
3866 .map_err(|error| map_grpc_admission_error(self.runtime.metrics().as_ref(), error))?;
3867 let report = self
3868 .store
3869 .import(import_request.clone())
3870 .await
3871 .map_err(map_store_error)?;
3872 record_trace(
3873 &self.runtime,
3874 TraceOperationKind::Import,
3875 "grpc",
3876 tenant_id,
3877 namespace,
3878 principal,
3879 started_at_unix_ms,
3880 if report.failed_records.is_empty() {
3881 TraceStatus::Ok
3882 } else {
3883 TraceStatus::Error
3884 },
3885 (!report.failed_records.is_empty())
3886 .then(|| format!("{} import validation failures", report.failed_records.len())),
3887 OperationTraceSummary {
3888 request_count: Some(import_request.package.records.len() as u32),
3889 dry_run: Some(import_request.dry_run),
3890 ..OperationTraceSummary::default()
3891 },
3892 None,
3893 correlation_id,
3894 );
3895 Ok(Response::new(ProtoImportReply {
3896 mode: request.mode,
3897 dry_run: report.dry_run,
3898 applied: report.applied,
3899 compatible_package: report.compatible_package,
3900 package_version: report.package_version,
3901 validated_records: report.validated_records,
3902 imported_records: report.imported_records,
3903 skipped_records: report.skipped_records,
3904 replaced_existing: report.replaced_existing,
3905 snapshot_id: report.snapshot_id,
3906 failed_records: report
3907 .failed_records
3908 .into_iter()
3909 .map(|failure| mnemara_protocol::v1::ImportFailure {
3910 record_id: failure.record_id,
3911 reason: failure.reason,
3912 })
3913 .collect(),
3914 }))
3915 }
3916}