1#![forbid(unsafe_code)]
2
3mod admission;
4mod observability;
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10use admission::{
11 AdmissionClass, AdmissionConfig, AdmissionController, AdmissionError, RuntimeAdmissionStatus,
12};
13use axum::extract::{Path, Query, Request as AxumRequest};
14use axum::http::HeaderMap;
15use axum::http::StatusCode;
16use axum::middleware::{self, Next};
17use axum::response::IntoResponse;
18use axum::routing::{get, post};
19use axum::{Json, Router};
20use mnemara_core::{
21 AffectiveAnnotation, AffectiveAnnotationProvenance, ArchiveReceipt, ArchiveRequest,
22 ArtifactPointer, BatchUpsertRequest, CompactionReport, CompactionRequest, ConflictAnnotation,
23 ConflictResolutionKind, ConflictReviewState, DeleteReceipt, DeleteRequest,
24 EPISODE_SCHEMA_VERSION, EmbeddingProviderKind, EngineTuningInfo, EpisodeContext,
25 EpisodeContinuityState, EpisodeSalience, ExportRequest, ImportMode, ImportReport,
26 ImportRequest, IntegrityCheckReport, IntegrityCheckRequest, LineageLink, LineageRelationKind,
27 MaintenanceStats, MemoryHistoricalState, MemoryQualityState, MemoryRecord, MemoryRecordKind,
28 MemoryScope, MemoryStore, MemoryTrustLevel, NamespaceStats, OperationTrace,
29 OperationTraceSummary, PortableRecord, PortableStorePackage, RecallCandidateSource,
30 RecallExplanation, RecallFilters, RecallHistoricalMode, RecallPlannerStage,
31 RecallPlanningProfile, RecallPlanningTrace, RecallPolicyProfile, RecallQuery, RecallResult,
32 RecallScorerKind, RecallScoringProfile, RecallTemporalOrder, RecallTraceCandidate,
33 RecoverReceipt, RecoverRequest, RepairReport, RepairRequest, SnapshotManifest,
34 StoreStatsReport, StoreStatsRequest, SuppressReceipt, SuppressRequest, TraceListRequest,
35 TraceOperationKind, TraceStatus, UpsertReceipt, UpsertRequest,
36};
37use mnemara_protocol::v1::memory_service_server::{MemoryService, MemoryServiceServer};
38use mnemara_protocol::v1::{
39 AffectiveAnnotation as ProtoAffectiveAnnotation, ArchiveReply,
40 ArchiveRequest as ProtoArchiveRequest, ArtifactPointer as ProtoArtifactPointer,
41 BatchUpsertMemoryRecordsReply, BatchUpsertMemoryRecordsRequest, CompactReply,
42 CompactRequest as ProtoCompactRequest, ConflictAnnotation as ProtoConflictAnnotation,
43 DeleteReply, DeleteRequest as ProtoDeleteRequest,
44 EmbeddingProviderKind as ProtoEmbeddingProviderKind, EngineTuningInfo as ProtoEngineTuningInfo,
45 EpisodeContext as ProtoEpisodeContext, EpisodeSalience as ProtoEpisodeSalience,
46 ExportReply as ProtoExportReply, ExportRequest as ProtoExportRequest,
47 GetTraceRequest as ProtoGetTraceRequest, ImportMode as ProtoImportMode,
48 ImportReply as ProtoImportReply, ImportRequest as ProtoImportRequest,
49 IntegrityCheckReply as ProtoIntegrityCheckReply,
50 IntegrityCheckRequest as ProtoIntegrityCheckRequest, LineageLink as ProtoLineageLink,
51 ListTracesReply as ProtoListTracesReply, ListTracesRequest as ProtoListTracesRequest,
52 MaintenanceStats as ProtoMaintenanceStats, MemoryRecord as ProtoMemoryRecord,
53 MemoryScope as ProtoMemoryScope, NamespaceStats as ProtoNamespaceStats,
54 OperationTrace as ProtoOperationTrace, OperationTraceSummary as ProtoOperationTraceSummary,
55 PortableRecord as ProtoPortableRecord, RecallExplanation as ProtoRecallExplanation,
56 RecallFilters as ProtoRecallFilters, RecallHit as ProtoRecallHit,
57 RecallPlanningTrace as ProtoRecallPlanningTrace,
58 RecallPolicyProfile as ProtoRecallPolicyProfile, RecallReply,
59 RecallRequest as ProtoRecallRequest, RecallScoreBreakdown as ProtoRecallScoreBreakdown,
60 RecallScorerKind as ProtoRecallScorerKind, RecallScoringProfile as ProtoRecallScoringProfile,
61 RecallTraceCandidate as ProtoRecallTraceCandidate, RecoverReply,
62 RecoverRequest as ProtoRecoverRequest, RepairReply as ProtoRepairReply,
63 RepairRequest as ProtoRepairRequest, SnapshotReply, SnapshotRequest,
64 StoreStatsReply as ProtoStoreStatsReply, StoreStatsRequest as ProtoStoreStatsRequest,
65 SuppressReply, SuppressRequest as ProtoSuppressRequest,
66 TraceOperationKind as ProtoTraceOperationKind, TraceStatus as ProtoTraceStatus,
67 UpsertMemoryRecordReply, UpsertMemoryRecordRequest as ProtoUpsertMemoryRecordRequest,
68};
69use observability::{TraceRegistry, TraceRegistrySnapshot, now_unix_ms};
70use tonic::{Request, Response, Status};
71
72#[derive(Debug, Clone, serde::Serialize)]
73struct HealthStatus {
74 status: &'static str,
75}
76
77#[derive(Debug, Clone, serde::Serialize)]
78struct ReadyStatus {
79 status: &'static str,
80 record_count: u64,
81 namespaces: Vec<String>,
82}
83
84#[derive(Debug, Clone, serde::Serialize)]
85struct ServerRuntimeStatus {
86 backend: String,
87 admission: RuntimeAdmissionStatus,
88 traces: TraceRegistrySnapshot,
89}
90
91#[derive(Debug, Clone, serde::Serialize)]
92struct HttpErrorBody {
93 error: String,
94}
95
96#[derive(Debug, Default)]
97struct MethodMetrics {
98 started: AtomicU64,
99 ok: AtomicU64,
100 invalid_argument: AtomicU64,
101 conflict: AtomicU64,
102 unimplemented: AtomicU64,
103 internal: AtomicU64,
104}
105
106impl MethodMetrics {
107 fn record_started(&self) {
108 self.started.fetch_add(1, Ordering::Relaxed);
109 }
110
111 fn record_status(&self, status: &Status) {
112 match status.code() {
113 tonic::Code::Ok => {
114 self.ok.fetch_add(1, Ordering::Relaxed);
115 }
116 tonic::Code::InvalidArgument => {
117 self.invalid_argument.fetch_add(1, Ordering::Relaxed);
118 }
119 tonic::Code::AlreadyExists => {
120 self.conflict.fetch_add(1, Ordering::Relaxed);
121 }
122 tonic::Code::Unimplemented => {
123 self.unimplemented.fetch_add(1, Ordering::Relaxed);
124 }
125 _ => {
126 self.internal.fetch_add(1, Ordering::Relaxed);
127 }
128 }
129 }
130}
131
132#[derive(Debug, Default)]
133pub struct ServerMetrics {
134 grpc_upsert: MethodMetrics,
135 grpc_batch_upsert: MethodMetrics,
136 grpc_recall: MethodMetrics,
137 grpc_compact: MethodMetrics,
138 grpc_snapshot: MethodMetrics,
139 grpc_delete: MethodMetrics,
140 grpc_archive: MethodMetrics,
141 grpc_suppress: MethodMetrics,
142 grpc_recover: MethodMetrics,
143 grpc_stats: MethodMetrics,
144 grpc_integrity: MethodMetrics,
145 grpc_repair: MethodMetrics,
146 http_healthz: AtomicU64,
147 http_readyz: AtomicU64,
148 http_snapshot: AtomicU64,
149 http_compact: AtomicU64,
150 http_delete: AtomicU64,
151 http_archive: AtomicU64,
152 http_suppress: AtomicU64,
153 http_recover: AtomicU64,
154 http_metrics: AtomicU64,
155 http_traces: AtomicU64,
156 http_runtime: AtomicU64,
157 http_export: AtomicU64,
158 http_import: AtomicU64,
159 http_rejected_body_too_large: AtomicU64,
160 admission_rejected: AtomicU64,
161 admission_timed_out: AtomicU64,
162 trace_records: AtomicU64,
163 trace_evictions: AtomicU64,
164 trace_reads: AtomicU64,
165}
166
167impl ServerMetrics {
168 pub fn render(&self) -> String {
169 let mut output = String::new();
170 append_counter(
171 &mut output,
172 "mnemara_grpc_upsert_requests_started_total",
173 self.grpc_upsert.started.load(Ordering::Relaxed),
174 );
175 append_counter(
176 &mut output,
177 "mnemara_grpc_upsert_requests_ok_total",
178 self.grpc_upsert.ok.load(Ordering::Relaxed),
179 );
180 append_counter(
181 &mut output,
182 "mnemara_grpc_upsert_requests_invalid_argument_total",
183 self.grpc_upsert.invalid_argument.load(Ordering::Relaxed),
184 );
185 append_counter(
186 &mut output,
187 "mnemara_grpc_upsert_requests_conflict_total",
188 self.grpc_upsert.conflict.load(Ordering::Relaxed),
189 );
190 append_counter(
191 &mut output,
192 "mnemara_grpc_upsert_requests_unimplemented_total",
193 self.grpc_upsert.unimplemented.load(Ordering::Relaxed),
194 );
195 append_counter(
196 &mut output,
197 "mnemara_grpc_upsert_requests_internal_total",
198 self.grpc_upsert.internal.load(Ordering::Relaxed),
199 );
200 append_counter(
201 &mut output,
202 "mnemara_grpc_batch_upsert_requests_started_total",
203 self.grpc_batch_upsert.started.load(Ordering::Relaxed),
204 );
205 append_counter(
206 &mut output,
207 "mnemara_grpc_batch_upsert_requests_ok_total",
208 self.grpc_batch_upsert.ok.load(Ordering::Relaxed),
209 );
210 append_counter(
211 &mut output,
212 "mnemara_grpc_batch_upsert_requests_invalid_argument_total",
213 self.grpc_batch_upsert
214 .invalid_argument
215 .load(Ordering::Relaxed),
216 );
217 append_counter(
218 &mut output,
219 "mnemara_grpc_recall_requests_started_total",
220 self.grpc_recall.started.load(Ordering::Relaxed),
221 );
222 append_counter(
223 &mut output,
224 "mnemara_grpc_recall_requests_ok_total",
225 self.grpc_recall.ok.load(Ordering::Relaxed),
226 );
227 append_counter(
228 &mut output,
229 "mnemara_grpc_recall_requests_invalid_argument_total",
230 self.grpc_recall.invalid_argument.load(Ordering::Relaxed),
231 );
232 append_counter(
233 &mut output,
234 "mnemara_grpc_compact_requests_started_total",
235 self.grpc_compact.started.load(Ordering::Relaxed),
236 );
237 append_counter(
238 &mut output,
239 "mnemara_grpc_compact_requests_ok_total",
240 self.grpc_compact.ok.load(Ordering::Relaxed),
241 );
242 append_counter(
243 &mut output,
244 "mnemara_grpc_snapshot_requests_started_total",
245 self.grpc_snapshot.started.load(Ordering::Relaxed),
246 );
247 append_counter(
248 &mut output,
249 "mnemara_grpc_snapshot_requests_ok_total",
250 self.grpc_snapshot.ok.load(Ordering::Relaxed),
251 );
252 append_counter(
253 &mut output,
254 "mnemara_grpc_delete_requests_started_total",
255 self.grpc_delete.started.load(Ordering::Relaxed),
256 );
257 append_counter(
258 &mut output,
259 "mnemara_grpc_delete_requests_ok_total",
260 self.grpc_delete.ok.load(Ordering::Relaxed),
261 );
262 append_counter(
263 &mut output,
264 "mnemara_grpc_archive_requests_started_total",
265 self.grpc_archive.started.load(Ordering::Relaxed),
266 );
267 append_counter(
268 &mut output,
269 "mnemara_grpc_archive_requests_ok_total",
270 self.grpc_archive.ok.load(Ordering::Relaxed),
271 );
272 append_counter(
273 &mut output,
274 "mnemara_grpc_suppress_requests_started_total",
275 self.grpc_suppress.started.load(Ordering::Relaxed),
276 );
277 append_counter(
278 &mut output,
279 "mnemara_grpc_suppress_requests_ok_total",
280 self.grpc_suppress.ok.load(Ordering::Relaxed),
281 );
282 append_counter(
283 &mut output,
284 "mnemara_grpc_recover_requests_started_total",
285 self.grpc_recover.started.load(Ordering::Relaxed),
286 );
287 append_counter(
288 &mut output,
289 "mnemara_grpc_recover_requests_ok_total",
290 self.grpc_recover.ok.load(Ordering::Relaxed),
291 );
292 append_counter(
293 &mut output,
294 "mnemara_http_healthz_requests_total",
295 self.http_healthz.load(Ordering::Relaxed),
296 );
297 append_counter(
298 &mut output,
299 "mnemara_http_readyz_requests_total",
300 self.http_readyz.load(Ordering::Relaxed),
301 );
302 append_counter(
303 &mut output,
304 "mnemara_http_snapshot_requests_total",
305 self.http_snapshot.load(Ordering::Relaxed),
306 );
307 append_counter(
308 &mut output,
309 "mnemara_http_compact_requests_total",
310 self.http_compact.load(Ordering::Relaxed),
311 );
312 append_counter(
313 &mut output,
314 "mnemara_http_delete_requests_total",
315 self.http_delete.load(Ordering::Relaxed),
316 );
317 append_counter(
318 &mut output,
319 "mnemara_http_archive_requests_total",
320 self.http_archive.load(Ordering::Relaxed),
321 );
322 append_counter(
323 &mut output,
324 "mnemara_http_suppress_requests_total",
325 self.http_suppress.load(Ordering::Relaxed),
326 );
327 append_counter(
328 &mut output,
329 "mnemara_http_recover_requests_total",
330 self.http_recover.load(Ordering::Relaxed),
331 );
332 append_counter(
333 &mut output,
334 "mnemara_http_metrics_requests_total",
335 self.http_metrics.load(Ordering::Relaxed),
336 );
337 append_counter(
338 &mut output,
339 "mnemara_http_traces_requests_total",
340 self.http_traces.load(Ordering::Relaxed),
341 );
342 append_counter(
343 &mut output,
344 "mnemara_http_runtime_requests_total",
345 self.http_runtime.load(Ordering::Relaxed),
346 );
347 append_counter(
348 &mut output,
349 "mnemara_http_export_requests_total",
350 self.http_export.load(Ordering::Relaxed),
351 );
352 append_counter(
353 &mut output,
354 "mnemara_http_import_requests_total",
355 self.http_import.load(Ordering::Relaxed),
356 );
357 append_counter(
358 &mut output,
359 "mnemara_http_rejected_body_too_large_total",
360 self.http_rejected_body_too_large.load(Ordering::Relaxed),
361 );
362 append_counter(
363 &mut output,
364 "mnemara_admission_rejected_total",
365 self.admission_rejected.load(Ordering::Relaxed),
366 );
367 append_counter(
368 &mut output,
369 "mnemara_admission_timed_out_total",
370 self.admission_timed_out.load(Ordering::Relaxed),
371 );
372 append_counter(
373 &mut output,
374 "mnemara_trace_records_total",
375 self.trace_records.load(Ordering::Relaxed),
376 );
377 append_counter(
378 &mut output,
379 "mnemara_trace_evictions_total",
380 self.trace_evictions.load(Ordering::Relaxed),
381 );
382 append_counter(
383 &mut output,
384 "mnemara_trace_reads_total",
385 self.trace_reads.load(Ordering::Relaxed),
386 );
387 output
388 }
389}
390
391fn append_counter(output: &mut String, name: &str, value: u64) {
392 output.push_str("# TYPE ");
393 output.push_str(name);
394 output.push_str(" counter\n");
395 output.push_str(name);
396 output.push(' ');
397 output.push_str(&value.to_string());
398 output.push('\n');
399}
400
401#[derive(Debug, Clone)]
402pub struct ServerLimits {
403 pub max_http_body_bytes: usize,
404 pub max_batch_upsert_requests: usize,
405 pub max_recall_items: usize,
406 pub max_query_text_bytes: usize,
407 pub max_record_content_bytes: usize,
408 pub max_labels_per_scope: usize,
409 pub max_inflight_reads: usize,
410 pub max_inflight_writes: usize,
411 pub max_inflight_admin: usize,
412 pub max_queued_requests: usize,
413 pub max_tenant_inflight: usize,
414 pub queue_wait_timeout_ms: u64,
415 pub trace_retention: usize,
416}
417
418impl Default for ServerLimits {
419 fn default() -> Self {
420 Self {
421 max_http_body_bytes: 64 * 1024,
422 max_batch_upsert_requests: 128,
423 max_recall_items: 64,
424 max_query_text_bytes: 4 * 1024,
425 max_record_content_bytes: 32 * 1024,
426 max_labels_per_scope: 32,
427 max_inflight_reads: 64,
428 max_inflight_writes: 32,
429 max_inflight_admin: 8,
430 max_queued_requests: 256,
431 max_tenant_inflight: 16,
432 queue_wait_timeout_ms: 2_000,
433 trace_retention: 256,
434 }
435 }
436}
437
438#[derive(Debug, Clone, Default)]
439pub struct AuthConfig {
440 pub bearer_token: Option<String>,
441 pub protect_metrics: bool,
442 pub token_policies: Vec<TokenPolicy>,
443}
444
445#[derive(Debug, Clone, Copy, PartialEq, Eq)]
446pub enum AuthPermission {
447 Read,
448 Write,
449 Admin,
450 Metrics,
451}
452
453#[derive(Debug, Clone, PartialEq, Eq)]
454pub struct TokenPolicy {
455 pub token: String,
456 pub permissions: Vec<AuthPermission>,
457}
458
459impl AuthConfig {
460 fn has_authentication(&self) -> bool {
461 self.bearer_token.is_some() || !self.token_policies.is_empty()
462 }
463
464 fn authorize(&self, authorization_header: Option<&str>, permission: AuthPermission) -> bool {
465 if !self.has_authentication() {
466 return true;
467 }
468
469 let Some(token) = bearer_token_from_header(authorization_header) else {
470 return false;
471 };
472
473 if self.bearer_token.as_deref() == Some(token) {
474 return true;
475 }
476
477 self.token_policies
478 .iter()
479 .any(|policy| policy.token == token && policy.permissions.contains(&permission))
480 }
481}
482
483#[derive(Debug, Clone, serde::Deserialize)]
484pub struct DeleteHttpRequest {
485 pub tenant_id: String,
486 pub namespace: String,
487 pub record_id: String,
488 pub hard_delete: bool,
489 pub audit_reason: String,
490}
491
492#[derive(Debug, Clone, serde::Deserialize)]
493pub struct ArchiveHttpRequest {
494 pub tenant_id: String,
495 pub namespace: String,
496 pub record_id: String,
497 pub dry_run: bool,
498 pub audit_reason: String,
499}
500
501#[derive(Debug, Clone, serde::Deserialize)]
502pub struct SuppressHttpRequest {
503 pub tenant_id: String,
504 pub namespace: String,
505 pub record_id: String,
506 pub dry_run: bool,
507 pub audit_reason: String,
508}
509
510#[derive(Debug, Clone, serde::Deserialize)]
511pub struct RecoverHttpRequest {
512 pub tenant_id: String,
513 pub namespace: String,
514 pub record_id: String,
515 pub dry_run: bool,
516 pub audit_reason: String,
517 pub quality_state: MemoryQualityState,
518 pub historical_state: Option<MemoryHistoricalState>,
519}
520
521pub struct GrpcMemoryService<S> {
522 store: Arc<S>,
523 runtime: ServerRuntime,
524}
525
526impl<S> Clone for GrpcMemoryService<S> {
527 fn clone(&self) -> Self {
528 Self {
529 store: Arc::clone(&self.store),
530 runtime: self.runtime.clone(),
531 }
532 }
533}
534
535#[derive(Debug)]
536struct ServerRuntimeInner {
537 backend: Arc<String>,
538 limits: Arc<ServerLimits>,
539 metrics: Arc<ServerMetrics>,
540 auth: Arc<AuthConfig>,
541 traces: Arc<TraceRegistry>,
542 admission: Arc<AdmissionController>,
543}
544
545#[derive(Debug, Clone)]
546pub struct ServerRuntime {
547 inner: Arc<ServerRuntimeInner>,
548}
549
550impl ServerRuntime {
551 pub fn new(
552 backend: impl Into<String>,
553 limits: ServerLimits,
554 metrics: Arc<ServerMetrics>,
555 auth: AuthConfig,
556 ) -> Self {
557 let admission = AdmissionController::new(AdmissionConfig {
558 max_inflight_reads: limits.max_inflight_reads,
559 max_inflight_writes: limits.max_inflight_writes,
560 max_inflight_admin: limits.max_inflight_admin,
561 max_queued_requests: limits.max_queued_requests,
562 max_tenant_inflight: limits.max_tenant_inflight,
563 queue_wait_timeout_ms: limits.queue_wait_timeout_ms,
564 });
565 Self {
566 inner: Arc::new(ServerRuntimeInner {
567 backend: Arc::new(backend.into()),
568 traces: Arc::new(TraceRegistry::new(limits.trace_retention)),
569 admission: Arc::new(admission),
570 auth: Arc::new(auth),
571 metrics,
572 limits: Arc::new(limits),
573 }),
574 }
575 }
576
577 fn limits(&self) -> &Arc<ServerLimits> {
578 &self.inner.limits
579 }
580
581 fn metrics(&self) -> &Arc<ServerMetrics> {
582 &self.inner.metrics
583 }
584
585 fn backend(&self) -> &Arc<String> {
586 &self.inner.backend
587 }
588
589 fn auth(&self) -> &Arc<AuthConfig> {
590 &self.inner.auth
591 }
592
593 fn traces(&self) -> &Arc<TraceRegistry> {
594 &self.inner.traces
595 }
596
597 fn admission(&self) -> &Arc<AdmissionController> {
598 &self.inner.admission
599 }
600}
601
602impl<S: MemoryStore> GrpcMemoryService<S> {
603 pub fn new(store: Arc<S>) -> Self {
604 let backend = store.as_ref().backend_kind().to_string();
605 Self::with_runtime(
606 store,
607 ServerRuntime::new(
608 backend,
609 ServerLimits::default(),
610 Arc::new(ServerMetrics::default()),
611 AuthConfig::default(),
612 ),
613 )
614 }
615
616 pub fn with_limits(store: Arc<S>, limits: ServerLimits) -> Self {
617 let backend = store.as_ref().backend_kind().to_string();
618 Self::with_runtime(
619 store,
620 ServerRuntime::new(
621 backend,
622 limits,
623 Arc::new(ServerMetrics::default()),
624 AuthConfig::default(),
625 ),
626 )
627 }
628
629 pub fn with_observability(
630 store: Arc<S>,
631 limits: ServerLimits,
632 metrics: Arc<ServerMetrics>,
633 ) -> Self {
634 let backend = store.as_ref().backend_kind().to_string();
635 Self::with_runtime(
636 store,
637 ServerRuntime::new(backend, limits, metrics, AuthConfig::default()),
638 )
639 }
640
641 pub fn with_runtime_config(
642 store: Arc<S>,
643 limits: ServerLimits,
644 metrics: Arc<ServerMetrics>,
645 auth: AuthConfig,
646 ) -> Self {
647 let backend = store.as_ref().backend_kind().to_string();
648 Self::with_runtime(store, ServerRuntime::new(backend, limits, metrics, auth))
649 }
650
651 pub fn with_runtime(store: Arc<S>, runtime: ServerRuntime) -> Self {
652 Self { store, runtime }
653 }
654
655 pub fn into_service(self) -> MemoryServiceServer<Self>
656 where
657 Self: MemoryService,
658 {
659 MemoryServiceServer::new(self)
660 }
661}
662
663pub async fn serve<S>(
664 addr: SocketAddr,
665 store: Arc<S>,
666 limits: ServerLimits,
667 auth: AuthConfig,
668) -> Result<(), tonic::transport::Error>
669where
670 S: MemoryStore + 'static,
671{
672 let backend = store.as_ref().backend_kind().to_string();
673 serve_with_runtime(
674 addr,
675 store,
676 ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
677 )
678 .await
679}
680
681pub async fn serve_with_runtime<S>(
682 addr: SocketAddr,
683 store: Arc<S>,
684 runtime: ServerRuntime,
685) -> Result<(), tonic::transport::Error>
686where
687 S: MemoryStore + 'static,
688{
689 tonic::transport::Server::builder()
690 .add_service(GrpcMemoryService::with_runtime(store, runtime).into_service())
691 .serve(addr)
692 .await
693}
694
695pub fn http_app<S>(store: Arc<S>, limits: ServerLimits, auth: AuthConfig) -> Router
696where
697 S: MemoryStore + 'static,
698{
699 let backend = store.as_ref().backend_kind().to_string();
700 http_app_with_runtime(
701 store,
702 ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
703 )
704}
705
706pub fn http_app_with_metrics<S>(
707 store: Arc<S>,
708 limits: ServerLimits,
709 metrics: Arc<ServerMetrics>,
710 auth: AuthConfig,
711) -> Router
712where
713 S: MemoryStore + 'static,
714{
715 let backend = store.as_ref().backend_kind().to_string();
716 http_app_with_runtime(store, ServerRuntime::new(backend, limits, metrics, auth))
717}
718
719pub fn http_app_with_runtime<S>(store: Arc<S>, runtime: ServerRuntime) -> Router
720where
721 S: MemoryStore + 'static,
722{
723 let limits = Arc::clone(runtime.limits());
724 let ready_store = Arc::clone(&store);
725 let upsert_store = Arc::clone(&store);
726 let batch_upsert_store = Arc::clone(&store);
727 let recall_store = Arc::clone(&store);
728 let snapshot_store = Arc::clone(&store);
729 let stats_store = Arc::clone(&store);
730 let integrity_store = Arc::clone(&store);
731 let repair_store = Arc::clone(&store);
732 let compact_store = Arc::clone(&store);
733 let delete_store = Arc::clone(&store);
734 let archive_store = Arc::clone(&store);
735 let suppress_store = Arc::clone(&store);
736 let recover_store = Arc::clone(&store);
737 let export_store = Arc::clone(&store);
738 let import_store = Arc::clone(&store);
739 let ready_runtime = runtime.clone();
740 let upsert_runtime = runtime.clone();
741 let batch_upsert_runtime = runtime.clone();
742 let recall_runtime = runtime.clone();
743 let snapshot_runtime = runtime.clone();
744 let stats_runtime = runtime.clone();
745 let integrity_runtime = runtime.clone();
746 let repair_runtime = runtime.clone();
747 let compact_runtime = runtime.clone();
748 let delete_runtime = runtime.clone();
749 let archive_runtime = runtime.clone();
750 let suppress_runtime = runtime.clone();
751 let recover_runtime = runtime.clone();
752 let traces_runtime = runtime.clone();
753 let trace_runtime = runtime.clone();
754 let export_runtime = runtime.clone();
755 let import_runtime = runtime.clone();
756 let runtime_status_runtime = runtime.clone();
757 let metrics_metrics = Arc::clone(runtime.metrics());
758 let middleware_limits = Arc::clone(&limits);
759 let middleware_metrics = Arc::clone(runtime.metrics());
760 let middleware_auth = Arc::clone(runtime.auth());
761
762 let health_metrics = Arc::clone(runtime.metrics());
763
764 Router::new()
765 .route(
766 "/healthz",
767 get(move || {
768 let metrics = Arc::clone(&health_metrics);
769 async move { healthz(metrics).await }
770 }),
771 )
772 .route(
773 "/readyz",
774 get(move || {
775 let store = Arc::clone(&ready_store);
776 let runtime = ready_runtime.clone();
777 async move { readyz(store, runtime).await }
778 }),
779 )
780 .route(
781 "/memory/upsert",
782 post(
783 move |headers: HeaderMap, Json(request): Json<UpsertRequest>| {
784 let store = Arc::clone(&upsert_store);
785 let runtime = upsert_runtime.clone();
786 async move { upsert_http(store, request, headers, runtime).await }
787 },
788 ),
789 )
790 .route(
791 "/memory/batch-upsert",
792 post(
793 move |headers: HeaderMap, Json(request): Json<BatchUpsertRequest>| {
794 let store = Arc::clone(&batch_upsert_store);
795 let runtime = batch_upsert_runtime.clone();
796 async move { batch_upsert_http(store, request, headers, runtime).await }
797 },
798 ),
799 )
800 .route(
801 "/memory/recall",
802 post(
803 move |headers: HeaderMap, Json(request): Json<RecallQuery>| {
804 let store = Arc::clone(&recall_store);
805 let runtime = recall_runtime.clone();
806 async move { recall_http(store, request, headers, runtime).await }
807 },
808 ),
809 )
810 .route(
811 "/admin/snapshot",
812 get(move |headers: HeaderMap| {
813 let store = Arc::clone(&snapshot_store);
814 let runtime = snapshot_runtime.clone();
815 async move { snapshot_http(store, headers, runtime).await }
816 }),
817 )
818 .route(
819 "/admin/stats",
820 get(
821 move |headers: HeaderMap, Query(request): Query<StoreStatsRequest>| {
822 let store = Arc::clone(&stats_store);
823 let runtime = stats_runtime.clone();
824 async move { stats_http(store, request, headers, runtime).await }
825 },
826 ),
827 )
828 .route(
829 "/admin/integrity",
830 get(
831 move |headers: HeaderMap, Query(request): Query<IntegrityCheckRequest>| {
832 let store = Arc::clone(&integrity_store);
833 let runtime = integrity_runtime.clone();
834 async move { integrity_http(store, request, headers, runtime).await }
835 },
836 ),
837 )
838 .route(
839 "/admin/repair",
840 post(
841 move |headers: HeaderMap, Json(request): Json<RepairRequest>| {
842 let store = Arc::clone(&repair_store);
843 let runtime = repair_runtime.clone();
844 async move { repair_http(store, request, headers, runtime).await }
845 },
846 ),
847 )
848 .route(
849 "/admin/compact",
850 post(
851 move |headers: HeaderMap, Json(request): Json<CompactionRequest>| {
852 let store = Arc::clone(&compact_store);
853 let runtime = compact_runtime.clone();
854 async move { compact_http(store, request, headers, runtime).await }
855 },
856 ),
857 )
858 .route(
859 "/admin/delete",
860 post(
861 move |headers: HeaderMap, Json(request): Json<DeleteHttpRequest>| {
862 let store = Arc::clone(&delete_store);
863 let runtime = delete_runtime.clone();
864 async move { delete_http(store, request, headers, runtime).await }
865 },
866 ),
867 )
868 .route(
869 "/admin/archive",
870 post(
871 move |headers: HeaderMap, Json(request): Json<ArchiveHttpRequest>| {
872 let store = Arc::clone(&archive_store);
873 let runtime = archive_runtime.clone();
874 async move { archive_http(store, request, headers, runtime).await }
875 },
876 ),
877 )
878 .route(
879 "/admin/suppress",
880 post(
881 move |headers: HeaderMap, Json(request): Json<SuppressHttpRequest>| {
882 let store = Arc::clone(&suppress_store);
883 let runtime = suppress_runtime.clone();
884 async move { suppress_http(store, request, headers, runtime).await }
885 },
886 ),
887 )
888 .route(
889 "/admin/recover",
890 post(
891 move |headers: HeaderMap, Json(request): Json<RecoverHttpRequest>| {
892 let store = Arc::clone(&recover_store);
893 let runtime = recover_runtime.clone();
894 async move { recover_http(store, request, headers, runtime).await }
895 },
896 ),
897 )
898 .route(
899 "/admin/traces",
900 get(move |Query(request): Query<TraceListRequest>| {
901 let runtime = traces_runtime.clone();
902 async move { traces_http(request, runtime).await }
903 }),
904 )
905 .route(
906 "/admin/traces/{trace_id}",
907 get(move |Path(trace_id): Path<String>| {
908 let runtime = trace_runtime.clone();
909 async move { trace_http(trace_id, runtime).await }
910 }),
911 )
912 .route(
913 "/admin/runtime",
914 get(move || {
915 let runtime = runtime_status_runtime.clone();
916 async move { runtime_status_http(runtime).await }
917 }),
918 )
919 .route(
920 "/admin/export",
921 post(
922 move |headers: HeaderMap, Json(request): Json<ExportRequest>| {
923 let store = Arc::clone(&export_store);
924 let runtime = export_runtime.clone();
925 async move { export_http(store, request, headers, runtime).await }
926 },
927 ),
928 )
929 .route(
930 "/admin/import",
931 post(
932 move |headers: HeaderMap, Json(request): Json<ImportRequest>| {
933 let store = Arc::clone(&import_store);
934 let runtime = import_runtime.clone();
935 async move { import_http(store, request, headers, runtime).await }
936 },
937 ),
938 )
939 .route(
940 "/metrics",
941 get(move || {
942 let metrics = Arc::clone(&metrics_metrics);
943 async move { metrics_http(metrics).await }
944 }),
945 )
946 .layer(middleware::from_fn(move |request, next| {
947 let limits = Arc::clone(&middleware_limits);
948 let metrics = Arc::clone(&middleware_metrics);
949 let auth = Arc::clone(&middleware_auth);
950 async move { enforce_http_guardrails(request, next, limits, metrics, auth).await }
951 }))
952}
953
954pub async fn serve_http<S>(
955 addr: SocketAddr,
956 store: Arc<S>,
957 limits: ServerLimits,
958 auth: AuthConfig,
959) -> std::io::Result<()>
960where
961 S: MemoryStore + 'static,
962{
963 let backend = store.as_ref().backend_kind().to_string();
964 serve_http_with_runtime(
965 addr,
966 store,
967 ServerRuntime::new(backend, limits, Arc::new(ServerMetrics::default()), auth),
968 )
969 .await
970}
971
972pub async fn serve_http_with_runtime<S>(
973 addr: SocketAddr,
974 store: Arc<S>,
975 runtime: ServerRuntime,
976) -> std::io::Result<()>
977where
978 S: MemoryStore + 'static,
979{
980 let listener = tokio::net::TcpListener::bind(addr).await?;
981 axum::serve(listener, http_app_with_runtime(store, runtime)).await
982}
983
984async fn healthz(metrics: Arc<ServerMetrics>) -> Json<HealthStatus> {
985 metrics.http_healthz.fetch_add(1, Ordering::Relaxed);
986 Json(HealthStatus { status: "ok" })
987}
988
989async fn readyz<S>(
990 store: Arc<S>,
991 runtime: ServerRuntime,
992) -> Result<Json<ReadyStatus>, (StatusCode, Json<HttpErrorBody>)>
993where
994 S: MemoryStore + 'static,
995{
996 runtime
997 .metrics()
998 .http_readyz
999 .fetch_add(1, Ordering::Relaxed);
1000 let manifest = store.snapshot().await.map_err(map_http_store_error)?;
1001 Ok(Json(ReadyStatus {
1002 status: "ready",
1003 record_count: manifest.record_count,
1004 namespaces: manifest.namespaces,
1005 }))
1006}
1007
1008async fn upsert_http<S>(
1009 store: Arc<S>,
1010 request: UpsertRequest,
1011 headers: HeaderMap,
1012 runtime: ServerRuntime,
1013) -> Result<Json<UpsertReceipt>, (StatusCode, Json<HttpErrorBody>)>
1014where
1015 S: MemoryStore + 'static,
1016{
1017 let started_at_unix_ms = now_unix_ms();
1018 let correlation_id = runtime.traces().next_id("corr");
1019 validate_record_limits(&request.record, runtime.limits().as_ref())
1020 .map_err(map_http_validation_error)?;
1021 let _permit = runtime
1022 .admission()
1023 .acquire(AdmissionClass::Write, Some(&request.record.scope.tenant_id))
1024 .await
1025 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1026 let receipt = store
1027 .upsert(request.clone())
1028 .await
1029 .map_err(map_http_store_error)?;
1030 record_trace(
1031 &runtime,
1032 TraceOperationKind::Upsert,
1033 "http",
1034 Some(request.record.scope.tenant_id),
1035 Some(request.record.scope.namespace),
1036 http_principal(&headers),
1037 started_at_unix_ms,
1038 TraceStatus::Ok,
1039 None,
1040 OperationTraceSummary {
1041 record_id: Some(request.record.id),
1042 ..OperationTraceSummary::default()
1043 },
1044 None,
1045 correlation_id,
1046 );
1047 Ok(Json(receipt))
1048}
1049
1050async fn batch_upsert_http<S>(
1051 store: Arc<S>,
1052 request: BatchUpsertRequest,
1053 headers: HeaderMap,
1054 runtime: ServerRuntime,
1055) -> Result<Json<Vec<UpsertReceipt>>, (StatusCode, Json<HttpErrorBody>)>
1056where
1057 S: MemoryStore + 'static,
1058{
1059 let started_at_unix_ms = now_unix_ms();
1060 let correlation_id = runtime.traces().next_id("corr");
1061 let tenant_id = request
1062 .requests
1063 .first()
1064 .map(|item| item.record.scope.tenant_id.clone());
1065 let namespace = request
1066 .requests
1067 .first()
1068 .map(|item| item.record.scope.namespace.clone());
1069 if request.requests.len() > runtime.limits().max_batch_upsert_requests {
1070 return Err((
1071 StatusCode::BAD_REQUEST,
1072 Json(HttpErrorBody {
1073 error: format!(
1074 "batch upsert request count exceeds configured max of {}",
1075 runtime.limits().max_batch_upsert_requests
1076 ),
1077 }),
1078 ));
1079 }
1080 for item in &request.requests {
1081 validate_record_limits(&item.record, runtime.limits().as_ref())
1082 .map_err(map_http_validation_error)?;
1083 }
1084 let _permit = runtime
1085 .admission()
1086 .acquire(AdmissionClass::Write, tenant_id.as_deref())
1087 .await
1088 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1089 let receipts = store
1090 .batch_upsert(request.clone())
1091 .await
1092 .map_err(map_http_store_error)?;
1093 record_trace(
1094 &runtime,
1095 TraceOperationKind::BatchUpsert,
1096 "http",
1097 tenant_id,
1098 namespace,
1099 http_principal(&headers),
1100 started_at_unix_ms,
1101 TraceStatus::Ok,
1102 None,
1103 OperationTraceSummary {
1104 request_count: Some(request.requests.len() as u32),
1105 ..OperationTraceSummary::default()
1106 },
1107 None,
1108 correlation_id,
1109 );
1110 Ok(Json(receipts))
1111}
1112
1113async fn recall_http<S>(
1114 store: Arc<S>,
1115 request: RecallQuery,
1116 headers: HeaderMap,
1117 runtime: ServerRuntime,
1118) -> Result<Json<RecallResult>, (StatusCode, Json<HttpErrorBody>)>
1119where
1120 S: MemoryStore + 'static,
1121{
1122 let started_at_unix_ms = now_unix_ms();
1123 let correlation_id = runtime.traces().next_id("corr");
1124 validate_recall_limits(&request, runtime.limits().as_ref())
1125 .map_err(map_http_validation_error)?;
1126 let _permit = runtime
1127 .admission()
1128 .acquire(AdmissionClass::Read, Some(&request.scope.tenant_id))
1129 .await
1130 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1131 let mut result = store
1132 .recall(request.clone())
1133 .await
1134 .map_err(map_http_store_error)?;
1135 attach_correlation_id(&mut result, &correlation_id);
1136 record_trace(
1137 &runtime,
1138 TraceOperationKind::Recall,
1139 "http",
1140 Some(request.scope.tenant_id),
1141 Some(request.scope.namespace),
1142 http_principal(&headers),
1143 started_at_unix_ms,
1144 TraceStatus::Ok,
1145 None,
1146 OperationTraceSummary {
1147 query_text: Some(request.query_text),
1148 max_items: Some(request.max_items as u32),
1149 token_budget: request.token_budget.map(|value| value as u32),
1150 ..OperationTraceSummary::default()
1151 },
1152 result.explanation.clone(),
1153 correlation_id,
1154 );
1155 Ok(Json(result))
1156}
1157
1158async fn snapshot_http<S>(
1159 store: Arc<S>,
1160 headers: HeaderMap,
1161 runtime: ServerRuntime,
1162) -> Result<Json<SnapshotManifest>, (StatusCode, Json<HttpErrorBody>)>
1163where
1164 S: MemoryStore + 'static,
1165{
1166 runtime
1167 .metrics()
1168 .http_snapshot
1169 .fetch_add(1, Ordering::Relaxed);
1170 let started_at_unix_ms = now_unix_ms();
1171 let correlation_id = runtime.traces().next_id("corr");
1172 let _permit = runtime
1173 .admission()
1174 .acquire(AdmissionClass::Admin, None)
1175 .await
1176 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1177 let manifest = store.snapshot().await.map_err(map_http_store_error)?;
1178 record_trace(
1179 &runtime,
1180 TraceOperationKind::Snapshot,
1181 "http",
1182 None,
1183 None,
1184 http_principal(&headers),
1185 started_at_unix_ms,
1186 TraceStatus::Ok,
1187 None,
1188 OperationTraceSummary::default(),
1189 None,
1190 correlation_id,
1191 );
1192 Ok(Json(manifest))
1193}
1194
1195async fn stats_http<S>(
1196 store: Arc<S>,
1197 request: StoreStatsRequest,
1198 headers: HeaderMap,
1199 runtime: ServerRuntime,
1200) -> Result<Json<StoreStatsReport>, (StatusCode, Json<HttpErrorBody>)>
1201where
1202 S: MemoryStore + 'static,
1203{
1204 let started_at_unix_ms = now_unix_ms();
1205 let correlation_id = runtime.traces().next_id("corr");
1206 let _permit = runtime
1207 .admission()
1208 .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1209 .await
1210 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1211 let report = store
1212 .stats(request.clone())
1213 .await
1214 .map_err(map_http_store_error)?;
1215 record_trace(
1216 &runtime,
1217 TraceOperationKind::Stats,
1218 "http",
1219 request.tenant_id,
1220 request.namespace,
1221 http_principal(&headers),
1222 started_at_unix_ms,
1223 TraceStatus::Ok,
1224 None,
1225 OperationTraceSummary::default(),
1226 None,
1227 correlation_id,
1228 );
1229 Ok(Json(report))
1230}
1231
1232async fn integrity_http<S>(
1233 store: Arc<S>,
1234 request: IntegrityCheckRequest,
1235 headers: HeaderMap,
1236 runtime: ServerRuntime,
1237) -> Result<Json<IntegrityCheckReport>, (StatusCode, Json<HttpErrorBody>)>
1238where
1239 S: MemoryStore + 'static,
1240{
1241 let started_at_unix_ms = now_unix_ms();
1242 let correlation_id = runtime.traces().next_id("corr");
1243 let _permit = runtime
1244 .admission()
1245 .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1246 .await
1247 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1248 let report = store
1249 .integrity_check(request.clone())
1250 .await
1251 .map_err(map_http_store_error)?;
1252 record_trace(
1253 &runtime,
1254 TraceOperationKind::IntegrityCheck,
1255 "http",
1256 request.tenant_id,
1257 request.namespace,
1258 http_principal(&headers),
1259 started_at_unix_ms,
1260 TraceStatus::Ok,
1261 None,
1262 OperationTraceSummary::default(),
1263 None,
1264 correlation_id,
1265 );
1266 Ok(Json(report))
1267}
1268
1269async fn repair_http<S>(
1270 store: Arc<S>,
1271 request: RepairRequest,
1272 headers: HeaderMap,
1273 runtime: ServerRuntime,
1274) -> Result<Json<RepairReport>, (StatusCode, Json<HttpErrorBody>)>
1275where
1276 S: MemoryStore + 'static,
1277{
1278 let started_at_unix_ms = now_unix_ms();
1279 let correlation_id = runtime.traces().next_id("corr");
1280 let _permit = runtime
1281 .admission()
1282 .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1283 .await
1284 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1285 let report = store
1286 .repair(request.clone())
1287 .await
1288 .map_err(map_http_store_error)?;
1289 record_trace(
1290 &runtime,
1291 TraceOperationKind::Repair,
1292 "http",
1293 request.tenant_id,
1294 request.namespace,
1295 http_principal(&headers),
1296 started_at_unix_ms,
1297 TraceStatus::Ok,
1298 None,
1299 OperationTraceSummary {
1300 dry_run: Some(request.dry_run),
1301 ..OperationTraceSummary::default()
1302 },
1303 None,
1304 correlation_id,
1305 );
1306 Ok(Json(report))
1307}
1308
1309async fn compact_http<S>(
1310 store: Arc<S>,
1311 request: CompactionRequest,
1312 headers: HeaderMap,
1313 runtime: ServerRuntime,
1314) -> Result<Json<CompactionReport>, (StatusCode, Json<HttpErrorBody>)>
1315where
1316 S: MemoryStore + 'static,
1317{
1318 runtime
1319 .metrics()
1320 .http_compact
1321 .fetch_add(1, Ordering::Relaxed);
1322 let started_at_unix_ms = now_unix_ms();
1323 let correlation_id = runtime.traces().next_id("corr");
1324 let _permit = runtime
1325 .admission()
1326 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1327 .await
1328 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1329 let report = store
1330 .compact(request.clone())
1331 .await
1332 .map_err(map_http_store_error)?;
1333 record_trace(
1334 &runtime,
1335 TraceOperationKind::Compact,
1336 "http",
1337 Some(request.tenant_id),
1338 request.namespace,
1339 http_principal(&headers),
1340 started_at_unix_ms,
1341 TraceStatus::Ok,
1342 None,
1343 OperationTraceSummary {
1344 dry_run: Some(request.dry_run),
1345 ..OperationTraceSummary::default()
1346 },
1347 None,
1348 correlation_id,
1349 );
1350 Ok(Json(report))
1351}
1352
1353async fn delete_http<S>(
1354 store: Arc<S>,
1355 request: DeleteHttpRequest,
1356 headers: HeaderMap,
1357 runtime: ServerRuntime,
1358) -> Result<Json<DeleteReceipt>, (StatusCode, Json<HttpErrorBody>)>
1359where
1360 S: MemoryStore + 'static,
1361{
1362 runtime
1363 .metrics()
1364 .http_delete
1365 .fetch_add(1, Ordering::Relaxed);
1366 let started_at_unix_ms = now_unix_ms();
1367 let correlation_id = runtime.traces().next_id("corr");
1368 let _permit = runtime
1369 .admission()
1370 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1371 .await
1372 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1373 let receipt = store
1374 .delete(DeleteRequest {
1375 tenant_id: request.tenant_id.clone(),
1376 namespace: request.namespace.clone(),
1377 record_id: request.record_id,
1378 hard_delete: request.hard_delete,
1379 audit_reason: request.audit_reason,
1380 })
1381 .await
1382 .map_err(map_http_store_error)?;
1383 record_trace(
1384 &runtime,
1385 TraceOperationKind::Delete,
1386 "http",
1387 Some(request.tenant_id),
1388 Some(request.namespace),
1389 http_principal(&headers),
1390 started_at_unix_ms,
1391 TraceStatus::Ok,
1392 None,
1393 OperationTraceSummary {
1394 record_id: Some(receipt.record_id.clone()),
1395 ..OperationTraceSummary::default()
1396 },
1397 None,
1398 correlation_id,
1399 );
1400 Ok(Json(receipt))
1401}
1402
1403async fn archive_http<S>(
1404 store: Arc<S>,
1405 request: ArchiveHttpRequest,
1406 headers: HeaderMap,
1407 runtime: ServerRuntime,
1408) -> Result<Json<ArchiveReceipt>, (StatusCode, Json<HttpErrorBody>)>
1409where
1410 S: MemoryStore + 'static,
1411{
1412 runtime
1413 .metrics()
1414 .http_archive
1415 .fetch_add(1, Ordering::Relaxed);
1416 let started_at_unix_ms = now_unix_ms();
1417 let correlation_id = runtime.traces().next_id("corr");
1418 let _permit = runtime
1419 .admission()
1420 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1421 .await
1422 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1423 let receipt = store
1424 .archive(ArchiveRequest {
1425 tenant_id: request.tenant_id.clone(),
1426 namespace: request.namespace.clone(),
1427 record_id: request.record_id,
1428 dry_run: request.dry_run,
1429 audit_reason: request.audit_reason,
1430 })
1431 .await
1432 .map_err(map_http_store_error)?;
1433 record_trace(
1434 &runtime,
1435 TraceOperationKind::Archive,
1436 "http",
1437 Some(request.tenant_id),
1438 Some(request.namespace),
1439 http_principal(&headers),
1440 started_at_unix_ms,
1441 TraceStatus::Ok,
1442 None,
1443 OperationTraceSummary {
1444 record_id: Some(receipt.record_id.clone()),
1445 dry_run: Some(receipt.dry_run),
1446 ..OperationTraceSummary::default()
1447 },
1448 None,
1449 correlation_id,
1450 );
1451 Ok(Json(receipt))
1452}
1453
1454async fn suppress_http<S>(
1455 store: Arc<S>,
1456 request: SuppressHttpRequest,
1457 headers: HeaderMap,
1458 runtime: ServerRuntime,
1459) -> Result<Json<SuppressReceipt>, (StatusCode, Json<HttpErrorBody>)>
1460where
1461 S: MemoryStore + 'static,
1462{
1463 runtime
1464 .metrics()
1465 .http_suppress
1466 .fetch_add(1, Ordering::Relaxed);
1467 let started_at_unix_ms = now_unix_ms();
1468 let correlation_id = runtime.traces().next_id("corr");
1469 let _permit = runtime
1470 .admission()
1471 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1472 .await
1473 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1474 let receipt = store
1475 .suppress(SuppressRequest {
1476 tenant_id: request.tenant_id.clone(),
1477 namespace: request.namespace.clone(),
1478 record_id: request.record_id,
1479 dry_run: request.dry_run,
1480 audit_reason: request.audit_reason,
1481 })
1482 .await
1483 .map_err(map_http_store_error)?;
1484 record_trace(
1485 &runtime,
1486 TraceOperationKind::Suppress,
1487 "http",
1488 Some(request.tenant_id),
1489 Some(request.namespace),
1490 http_principal(&headers),
1491 started_at_unix_ms,
1492 TraceStatus::Ok,
1493 None,
1494 OperationTraceSummary {
1495 record_id: Some(receipt.record_id.clone()),
1496 dry_run: Some(receipt.dry_run),
1497 ..OperationTraceSummary::default()
1498 },
1499 None,
1500 correlation_id,
1501 );
1502 Ok(Json(receipt))
1503}
1504
1505async fn recover_http<S>(
1506 store: Arc<S>,
1507 request: RecoverHttpRequest,
1508 headers: HeaderMap,
1509 runtime: ServerRuntime,
1510) -> Result<Json<RecoverReceipt>, (StatusCode, Json<HttpErrorBody>)>
1511where
1512 S: MemoryStore + 'static,
1513{
1514 runtime
1515 .metrics()
1516 .http_recover
1517 .fetch_add(1, Ordering::Relaxed);
1518 let started_at_unix_ms = now_unix_ms();
1519 let correlation_id = runtime.traces().next_id("corr");
1520 let _permit = runtime
1521 .admission()
1522 .acquire(AdmissionClass::Admin, Some(&request.tenant_id))
1523 .await
1524 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1525 let receipt = store
1526 .recover(RecoverRequest {
1527 tenant_id: request.tenant_id.clone(),
1528 namespace: request.namespace.clone(),
1529 record_id: request.record_id,
1530 dry_run: request.dry_run,
1531 audit_reason: request.audit_reason,
1532 quality_state: request.quality_state,
1533 historical_state: request.historical_state,
1534 })
1535 .await
1536 .map_err(map_http_store_error)?;
1537 record_trace(
1538 &runtime,
1539 TraceOperationKind::Recover,
1540 "http",
1541 Some(request.tenant_id),
1542 Some(request.namespace),
1543 http_principal(&headers),
1544 started_at_unix_ms,
1545 TraceStatus::Ok,
1546 None,
1547 OperationTraceSummary {
1548 record_id: Some(receipt.record_id.clone()),
1549 dry_run: Some(receipt.dry_run),
1550 ..OperationTraceSummary::default()
1551 },
1552 None,
1553 correlation_id,
1554 );
1555 Ok(Json(receipt))
1556}
1557
1558async fn traces_http(
1559 request: TraceListRequest,
1560 runtime: ServerRuntime,
1561) -> Result<Json<Vec<OperationTrace>>, (StatusCode, Json<HttpErrorBody>)> {
1562 runtime
1563 .metrics()
1564 .http_traces
1565 .fetch_add(1, Ordering::Relaxed);
1566 runtime
1567 .metrics()
1568 .trace_reads
1569 .fetch_add(1, Ordering::Relaxed);
1570 Ok(Json(runtime.traces().list(&request)))
1571}
1572
1573async fn trace_http(
1574 trace_id: String,
1575 runtime: ServerRuntime,
1576) -> Result<Json<OperationTrace>, (StatusCode, Json<HttpErrorBody>)> {
1577 runtime
1578 .metrics()
1579 .http_traces
1580 .fetch_add(1, Ordering::Relaxed);
1581 runtime
1582 .metrics()
1583 .trace_reads
1584 .fetch_add(1, Ordering::Relaxed);
1585 runtime.traces().get(&trace_id).map(Json).ok_or_else(|| {
1586 (
1587 StatusCode::NOT_FOUND,
1588 Json(HttpErrorBody {
1589 error: format!("trace {trace_id} not found"),
1590 }),
1591 )
1592 })
1593}
1594
1595async fn runtime_status_http(runtime: ServerRuntime) -> Json<ServerRuntimeStatus> {
1596 runtime
1597 .metrics()
1598 .http_runtime
1599 .fetch_add(1, Ordering::Relaxed);
1600 Json(ServerRuntimeStatus {
1601 backend: runtime.backend().as_ref().clone(),
1602 admission: runtime.admission().snapshot(),
1603 traces: runtime.traces().snapshot(),
1604 })
1605}
1606
1607async fn export_http<S>(
1608 store: Arc<S>,
1609 request: ExportRequest,
1610 headers: HeaderMap,
1611 runtime: ServerRuntime,
1612) -> Result<Json<PortableStorePackage>, (StatusCode, Json<HttpErrorBody>)>
1613where
1614 S: MemoryStore + 'static,
1615{
1616 runtime
1617 .metrics()
1618 .http_export
1619 .fetch_add(1, Ordering::Relaxed);
1620 let started_at_unix_ms = now_unix_ms();
1621 let correlation_id = runtime.traces().next_id("corr");
1622 let _permit = runtime
1623 .admission()
1624 .acquire(AdmissionClass::Admin, request.tenant_id.as_deref())
1625 .await
1626 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1627 let package = store
1628 .export(request.clone())
1629 .await
1630 .map_err(map_http_store_error)?;
1631 record_trace(
1632 &runtime,
1633 TraceOperationKind::Export,
1634 "http",
1635 request.tenant_id,
1636 request.namespace,
1637 http_principal(&headers),
1638 started_at_unix_ms,
1639 TraceStatus::Ok,
1640 None,
1641 OperationTraceSummary::default(),
1642 None,
1643 correlation_id,
1644 );
1645 Ok(Json(package))
1646}
1647
1648async fn import_http<S>(
1649 store: Arc<S>,
1650 request: ImportRequest,
1651 headers: HeaderMap,
1652 runtime: ServerRuntime,
1653) -> Result<Json<ImportReport>, (StatusCode, Json<HttpErrorBody>)>
1654where
1655 S: MemoryStore + 'static,
1656{
1657 runtime
1658 .metrics()
1659 .http_import
1660 .fetch_add(1, Ordering::Relaxed);
1661 let started_at_unix_ms = now_unix_ms();
1662 let correlation_id = runtime.traces().next_id("corr");
1663 let tenant_id = request
1664 .package
1665 .records
1666 .first()
1667 .map(|entry| entry.record.scope.tenant_id.clone());
1668 let namespace = request
1669 .package
1670 .records
1671 .first()
1672 .map(|entry| entry.record.scope.namespace.clone());
1673 let _permit = runtime
1674 .admission()
1675 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
1676 .await
1677 .map_err(|error| map_http_admission_error(runtime.metrics().as_ref(), error))?;
1678 let report = store
1679 .import(request.clone())
1680 .await
1681 .map_err(map_http_store_error)?;
1682 record_trace(
1683 &runtime,
1684 TraceOperationKind::Import,
1685 "http",
1686 tenant_id,
1687 namespace,
1688 http_principal(&headers),
1689 started_at_unix_ms,
1690 if report.failed_records.is_empty() {
1691 TraceStatus::Ok
1692 } else {
1693 TraceStatus::Error
1694 },
1695 (!report.failed_records.is_empty())
1696 .then(|| format!("{} import validation failures", report.failed_records.len())),
1697 OperationTraceSummary {
1698 request_count: Some(request.package.records.len() as u32),
1699 dry_run: Some(request.dry_run),
1700 ..OperationTraceSummary::default()
1701 },
1702 None,
1703 correlation_id,
1704 );
1705 Ok(Json(report))
1706}
1707
1708async fn metrics_http(metrics: Arc<ServerMetrics>) -> impl IntoResponse {
1709 metrics.http_metrics.fetch_add(1, Ordering::Relaxed);
1710 (
1711 [(
1712 axum::http::header::CONTENT_TYPE,
1713 "text/plain; version=0.0.4",
1714 )],
1715 metrics.render(),
1716 )
1717}
1718
1719async fn enforce_http_guardrails(
1720 request: AxumRequest,
1721 next: Next,
1722 limits: Arc<ServerLimits>,
1723 metrics: Arc<ServerMetrics>,
1724 auth: Arc<AuthConfig>,
1725) -> Result<axum::response::Response, StatusCode> {
1726 let path = request.uri().path().to_string();
1727 if let Some(content_length) = request.headers().get(axum::http::header::CONTENT_LENGTH)
1728 && let Ok(content_length) = content_length.to_str()
1729 && let Ok(content_length) = content_length.parse::<usize>()
1730 && content_length > limits.max_http_body_bytes
1731 {
1732 metrics
1733 .http_rejected_body_too_large
1734 .fetch_add(1, Ordering::Relaxed);
1735 return Err(StatusCode::PAYLOAD_TOO_LARGE);
1736 }
1737 validate_http_auth(&request, &path, auth.as_ref())?;
1738 Ok(next.run(request).await)
1739}
1740
1741fn http_principal(headers: &HeaderMap) -> Option<String> {
1742 headers
1743 .get(axum::http::header::AUTHORIZATION)
1744 .and_then(|value| value.to_str().ok())
1745 .map(|_| "bearer".to_string())
1746}
1747
1748fn grpc_principal<T>(request: &Request<T>) -> Option<String> {
1749 request
1750 .metadata()
1751 .get("authorization")
1752 .and_then(|value| value.to_str().ok())
1753 .map(|_| "bearer".to_string())
1754}
1755
1756fn map_http_admission_error(
1757 metrics: &ServerMetrics,
1758 error: AdmissionError,
1759) -> (StatusCode, Json<HttpErrorBody>) {
1760 match error {
1761 AdmissionError::QueueFull | AdmissionError::TenantBusy => {
1762 metrics.admission_rejected.fetch_add(1, Ordering::Relaxed);
1763 (
1764 StatusCode::TOO_MANY_REQUESTS,
1765 Json(HttpErrorBody {
1766 error: "request admission rejected".to_string(),
1767 }),
1768 )
1769 }
1770 AdmissionError::TimedOut => {
1771 metrics.admission_timed_out.fetch_add(1, Ordering::Relaxed);
1772 (
1773 StatusCode::TOO_MANY_REQUESTS,
1774 Json(HttpErrorBody {
1775 error: "request admission timed out".to_string(),
1776 }),
1777 )
1778 }
1779 }
1780}
1781
1782fn map_grpc_admission_error(metrics: &ServerMetrics, error: AdmissionError) -> Status {
1783 match error {
1784 AdmissionError::QueueFull | AdmissionError::TenantBusy => {
1785 metrics.admission_rejected.fetch_add(1, Ordering::Relaxed);
1786 Status::resource_exhausted("request admission rejected")
1787 }
1788 AdmissionError::TimedOut => {
1789 metrics.admission_timed_out.fetch_add(1, Ordering::Relaxed);
1790 Status::resource_exhausted("request admission timed out")
1791 }
1792 }
1793}
1794
1795fn attach_correlation_id(result: &mut RecallResult, correlation_id: &str) {
1796 if let Some(explanation) = result.explanation.as_mut() {
1797 if let Some(existing) = explanation.trace_id.replace(correlation_id.to_string()) {
1798 explanation
1799 .policy_notes
1800 .push(format!("planning_trace_id={existing}"));
1801 }
1802 explanation
1803 .policy_notes
1804 .push(format!("correlation_id={correlation_id}"));
1805 }
1806 for hit in &mut result.hits {
1807 if let Some(explanation) = hit.explanation.as_mut() {
1808 if let Some(existing) = explanation.trace_id.replace(correlation_id.to_string()) {
1809 explanation
1810 .policy_notes
1811 .push(format!("planning_trace_id={existing}"));
1812 }
1813 explanation
1814 .policy_notes
1815 .push(format!("correlation_id={correlation_id}"));
1816 }
1817 }
1818}
1819
1820#[allow(clippy::too_many_arguments)]
1821fn record_trace(
1822 runtime: &ServerRuntime,
1823 operation: TraceOperationKind,
1824 transport: &str,
1825 tenant_id: Option<String>,
1826 namespace: Option<String>,
1827 principal: Option<String>,
1828 started_at_unix_ms: u64,
1829 status: TraceStatus,
1830 status_message: Option<String>,
1831 summary: OperationTraceSummary,
1832 recall_explanation: Option<RecallExplanation>,
1833 correlation_id: String,
1834) {
1835 let completed_at_unix_ms = now_unix_ms();
1836 let admission_class = admission_class_for_operation(operation.clone()).to_string();
1837 let planning_trace_id = recall_explanation.as_ref().and_then(|value| {
1838 value
1839 .planning_trace
1840 .as_ref()
1841 .map(|trace| trace.trace_id.clone())
1842 });
1843 let store_span_id = runtime.traces().next_id("store-span");
1844 let evicted = runtime.traces().record(OperationTrace {
1845 trace_id: runtime.traces().next_id("trace"),
1846 correlation_id,
1847 operation,
1848 transport: transport.to_string(),
1849 backend: Some(runtime.backend().as_ref().clone()),
1850 admission_class: Some(admission_class),
1851 tenant_id,
1852 namespace,
1853 principal,
1854 store_span_id: Some(store_span_id),
1855 planning_trace_id,
1856 started_at_unix_ms,
1857 completed_at_unix_ms,
1858 latency_ms: completed_at_unix_ms.saturating_sub(started_at_unix_ms),
1859 status,
1860 status_message,
1861 summary,
1862 recall_explanation,
1863 });
1864 runtime
1865 .metrics()
1866 .trace_records
1867 .fetch_add(1, Ordering::Relaxed);
1868 if evicted {
1869 runtime
1870 .metrics()
1871 .trace_evictions
1872 .fetch_add(1, Ordering::Relaxed);
1873 }
1874}
1875
1876fn admission_class_for_operation(operation: TraceOperationKind) -> &'static str {
1877 match operation {
1878 TraceOperationKind::Recall | TraceOperationKind::Snapshot | TraceOperationKind::Stats => {
1879 "read"
1880 }
1881 TraceOperationKind::Upsert
1882 | TraceOperationKind::BatchUpsert
1883 | TraceOperationKind::Compact
1884 | TraceOperationKind::Delete
1885 | TraceOperationKind::Archive
1886 | TraceOperationKind::Suppress
1887 | TraceOperationKind::Recover => "write",
1888 TraceOperationKind::IntegrityCheck
1889 | TraceOperationKind::Repair
1890 | TraceOperationKind::Export
1891 | TraceOperationKind::Import => "admin",
1892 }
1893}
1894
1895fn validate_http_auth(
1896 request: &AxumRequest,
1897 path: &str,
1898 auth: &AuthConfig,
1899) -> Result<(), StatusCode> {
1900 if path == "/healthz" || path == "/readyz" || (path == "/metrics" && !auth.protect_metrics) {
1901 return Ok(());
1902 }
1903 let provided = request
1904 .headers()
1905 .get(axum::http::header::AUTHORIZATION)
1906 .and_then(|value| value.to_str().ok());
1907 let permission = match path {
1908 "/metrics" => AuthPermission::Metrics,
1909 "/memory/recall" => AuthPermission::Read,
1910 "/memory/upsert" | "/memory/batch-upsert" => AuthPermission::Write,
1911 "/admin/stats" | "/admin/integrity" | "/admin/repair" | "/admin/snapshot"
1912 | "/admin/compact" | "/admin/delete" | "/admin/archive" | "/admin/suppress"
1913 | "/admin/recover" | "/admin/runtime" | "/admin/export" | "/admin/import" => {
1914 AuthPermission::Admin
1915 }
1916 _ => AuthPermission::Admin,
1917 };
1918 if path.starts_with("/admin/traces") {
1919 return auth
1920 .authorize(provided, AuthPermission::Admin)
1921 .then_some(())
1922 .ok_or(StatusCode::UNAUTHORIZED);
1923 }
1924 if auth.authorize(provided, permission) {
1925 Ok(())
1926 } else {
1927 Err(StatusCode::UNAUTHORIZED)
1928 }
1929}
1930
1931fn validate_grpc_auth<T>(
1932 request: &Request<T>,
1933 auth: &AuthConfig,
1934 permission: AuthPermission,
1935) -> Result<(), Status> {
1936 let provided = request
1937 .metadata()
1938 .get("authorization")
1939 .and_then(|value| value.to_str().ok());
1940 if auth.authorize(provided, permission) {
1941 Ok(())
1942 } else {
1943 Err(Status::unauthenticated("missing or invalid bearer token"))
1944 }
1945}
1946
1947fn bearer_token_from_header(header: Option<&str>) -> Option<&str> {
1948 header.and_then(|value| value.strip_prefix("Bearer "))
1949}
1950
1951fn invalid_argument(message: impl Into<String>) -> Status {
1952 Status::invalid_argument(message.into())
1953}
1954
1955fn internal_status(message: impl Into<String>) -> Status {
1956 Status::internal(message.into())
1957}
1958
1959fn trust_level_from_proto(value: &str) -> Result<MemoryTrustLevel, Status> {
1960 match value {
1961 "" | "derived" => Ok(MemoryTrustLevel::Derived),
1962 "untrusted" => Ok(MemoryTrustLevel::Untrusted),
1963 "observed" => Ok(MemoryTrustLevel::Observed),
1964 "verified" => Ok(MemoryTrustLevel::Verified),
1965 "pinned" => Ok(MemoryTrustLevel::Pinned),
1966 other => Err(invalid_argument(format!("unknown trust level: {other}"))),
1967 }
1968}
1969
1970fn trust_level_to_proto(value: MemoryTrustLevel) -> String {
1971 match value {
1972 MemoryTrustLevel::Untrusted => "untrusted",
1973 MemoryTrustLevel::Observed => "observed",
1974 MemoryTrustLevel::Derived => "derived",
1975 MemoryTrustLevel::Verified => "verified",
1976 MemoryTrustLevel::Pinned => "pinned",
1977 }
1978 .to_string()
1979}
1980
1981fn record_kind_from_proto(value: &str) -> Result<MemoryRecordKind, Status> {
1982 match value {
1983 "episodic" => Ok(MemoryRecordKind::Episodic),
1984 "summary" => Ok(MemoryRecordKind::Summary),
1985 "fact" => Ok(MemoryRecordKind::Fact),
1986 "preference" => Ok(MemoryRecordKind::Preference),
1987 "task" => Ok(MemoryRecordKind::Task),
1988 "artifact" => Ok(MemoryRecordKind::Artifact),
1989 "hypothesis" => Ok(MemoryRecordKind::Hypothesis),
1990 other => Err(invalid_argument(format!("unknown record kind: {other}"))),
1991 }
1992}
1993
1994fn record_kind_to_proto(value: MemoryRecordKind) -> String {
1995 match value {
1996 MemoryRecordKind::Episodic => "episodic",
1997 MemoryRecordKind::Summary => "summary",
1998 MemoryRecordKind::Fact => "fact",
1999 MemoryRecordKind::Preference => "preference",
2000 MemoryRecordKind::Task => "task",
2001 MemoryRecordKind::Artifact => "artifact",
2002 MemoryRecordKind::Hypothesis => "hypothesis",
2003 }
2004 .to_string()
2005}
2006
2007fn quality_state_from_proto(value: &str) -> Result<MemoryQualityState, Status> {
2008 match value {
2009 "draft" => Ok(MemoryQualityState::Draft),
2010 "active" => Ok(MemoryQualityState::Active),
2011 "verified" => Ok(MemoryQualityState::Verified),
2012 "archived" => Ok(MemoryQualityState::Archived),
2013 "suppressed" => Ok(MemoryQualityState::Suppressed),
2014 "deleted" => Ok(MemoryQualityState::Deleted),
2015 other => Err(invalid_argument(format!("unknown quality state: {other}"))),
2016 }
2017}
2018
2019fn quality_state_to_proto(value: MemoryQualityState) -> String {
2020 match value {
2021 MemoryQualityState::Draft => "draft",
2022 MemoryQualityState::Active => "active",
2023 MemoryQualityState::Verified => "verified",
2024 MemoryQualityState::Archived => "archived",
2025 MemoryQualityState::Suppressed => "suppressed",
2026 MemoryQualityState::Deleted => "deleted",
2027 }
2028 .to_string()
2029}
2030
2031fn scope_from_proto(scope: ProtoMemoryScope) -> Result<MemoryScope, Status> {
2032 Ok(MemoryScope {
2033 tenant_id: scope.tenant_id,
2034 namespace: scope.namespace,
2035 actor_id: scope.actor_id,
2036 conversation_id: scope.conversation_id,
2037 session_id: scope.session_id,
2038 source: scope.source,
2039 labels: scope.labels,
2040 trust_level: trust_level_from_proto(&scope.trust_level)?,
2041 })
2042}
2043
2044fn scope_to_proto(scope: MemoryScope) -> ProtoMemoryScope {
2045 ProtoMemoryScope {
2046 tenant_id: scope.tenant_id,
2047 namespace: scope.namespace,
2048 actor_id: scope.actor_id,
2049 conversation_id: scope.conversation_id,
2050 session_id: scope.session_id,
2051 source: scope.source,
2052 labels: scope.labels,
2053 trust_level: trust_level_to_proto(scope.trust_level),
2054 }
2055}
2056
2057fn artifact_from_proto(value: ProtoArtifactPointer) -> ArtifactPointer {
2058 ArtifactPointer {
2059 uri: value.uri,
2060 media_type: value.media_type,
2061 checksum: value.checksum,
2062 }
2063}
2064
2065fn artifact_to_proto(value: ArtifactPointer) -> ProtoArtifactPointer {
2066 ProtoArtifactPointer {
2067 uri: value.uri,
2068 media_type: value.media_type,
2069 checksum: value.checksum,
2070 }
2071}
2072
2073fn continuity_state_from_proto(value: &str) -> Result<EpisodeContinuityState, Status> {
2074 match value {
2075 "" | "open" => Ok(EpisodeContinuityState::Open),
2076 "resolved" => Ok(EpisodeContinuityState::Resolved),
2077 "superseded" => Ok(EpisodeContinuityState::Superseded),
2078 "abandoned" => Ok(EpisodeContinuityState::Abandoned),
2079 other => Err(invalid_argument(format!(
2080 "unknown continuity state: {other}"
2081 ))),
2082 }
2083}
2084
2085fn continuity_state_to_proto(value: EpisodeContinuityState) -> String {
2086 match value {
2087 EpisodeContinuityState::Open => "open",
2088 EpisodeContinuityState::Resolved => "resolved",
2089 EpisodeContinuityState::Superseded => "superseded",
2090 EpisodeContinuityState::Abandoned => "abandoned",
2091 }
2092 .to_string()
2093}
2094
2095fn affective_provenance_from_proto(value: &str) -> Result<AffectiveAnnotationProvenance, Status> {
2096 match value {
2097 "" | "authored" => Ok(AffectiveAnnotationProvenance::Authored),
2098 "imported" => Ok(AffectiveAnnotationProvenance::Imported),
2099 "derived" => Ok(AffectiveAnnotationProvenance::Derived),
2100 other => Err(invalid_argument(format!(
2101 "unknown affective provenance: {other}"
2102 ))),
2103 }
2104}
2105
2106fn affective_provenance_to_proto(value: AffectiveAnnotationProvenance) -> String {
2107 match value {
2108 AffectiveAnnotationProvenance::Authored => "authored",
2109 AffectiveAnnotationProvenance::Imported => "imported",
2110 AffectiveAnnotationProvenance::Derived => "derived",
2111 }
2112 .to_string()
2113}
2114
2115fn temporal_order_from_proto(value: &str) -> Result<RecallTemporalOrder, Status> {
2116 match value {
2117 "" | "relevance" => Ok(RecallTemporalOrder::Relevance),
2118 "chronological_asc" => Ok(RecallTemporalOrder::ChronologicalAsc),
2119 "chronological_desc" => Ok(RecallTemporalOrder::ChronologicalDesc),
2120 other => Err(invalid_argument(format!("unknown temporal order: {other}"))),
2121 }
2122}
2123
2124fn planning_profile_from_proto(value: &str) -> Result<RecallPlanningProfile, Status> {
2125 match value {
2126 "" | "fast_path" => Ok(RecallPlanningProfile::FastPath),
2127 "continuity_aware" => Ok(RecallPlanningProfile::ContinuityAware),
2128 other => Err(invalid_argument(format!(
2129 "unknown planning profile: {other}"
2130 ))),
2131 }
2132}
2133
2134fn planning_profile_to_proto(value: RecallPlanningProfile) -> String {
2135 match value {
2136 RecallPlanningProfile::FastPath => "fast_path",
2137 RecallPlanningProfile::ContinuityAware => "continuity_aware",
2138 }
2139 .to_string()
2140}
2141
2142fn planner_stage_to_proto(value: RecallPlannerStage) -> String {
2143 match value {
2144 RecallPlannerStage::CandidateGeneration => "candidate_generation",
2145 RecallPlannerStage::GraphExpansion => "graph_expansion",
2146 RecallPlannerStage::Selection => "selection",
2147 }
2148 .to_string()
2149}
2150
2151fn candidate_source_to_proto(value: RecallCandidateSource) -> String {
2152 match value {
2153 RecallCandidateSource::Lexical => "lexical",
2154 RecallCandidateSource::Semantic => "semantic",
2155 RecallCandidateSource::Metadata => "metadata",
2156 RecallCandidateSource::Episode => "episode",
2157 RecallCandidateSource::Graph => "graph",
2158 RecallCandidateSource::Temporal => "temporal",
2159 RecallCandidateSource::Provenance => "provenance",
2160 }
2161 .to_string()
2162}
2163
2164fn affective_annotation_from_proto(
2165 value: ProtoAffectiveAnnotation,
2166) -> Result<AffectiveAnnotation, Status> {
2167 Ok(AffectiveAnnotation {
2168 tone: value.tone,
2169 sentiment: value.sentiment,
2170 urgency: value.urgency,
2171 confidence: value.confidence,
2172 tension: value.tension,
2173 provenance: affective_provenance_from_proto(&value.provenance)?,
2174 })
2175}
2176
2177fn affective_annotation_to_proto(value: AffectiveAnnotation) -> ProtoAffectiveAnnotation {
2178 ProtoAffectiveAnnotation {
2179 tone: value.tone,
2180 sentiment: value.sentiment,
2181 urgency: value.urgency,
2182 confidence: value.confidence,
2183 tension: value.tension,
2184 provenance: affective_provenance_to_proto(value.provenance),
2185 }
2186}
2187
2188fn historical_state_from_proto(value: &str) -> Result<MemoryHistoricalState, Status> {
2189 match value {
2190 "" | "current" => Ok(MemoryHistoricalState::Current),
2191 "historical" => Ok(MemoryHistoricalState::Historical),
2192 "superseded" => Ok(MemoryHistoricalState::Superseded),
2193 other => Err(invalid_argument(format!(
2194 "unknown historical state: {other}"
2195 ))),
2196 }
2197}
2198
2199fn historical_state_to_proto(value: MemoryHistoricalState) -> String {
2200 match value {
2201 MemoryHistoricalState::Current => "current",
2202 MemoryHistoricalState::Historical => "historical",
2203 MemoryHistoricalState::Superseded => "superseded",
2204 }
2205 .to_string()
2206}
2207
2208fn lineage_relation_from_proto(value: &str) -> Result<LineageRelationKind, Status> {
2209 match value {
2210 "" | "derived_from" => Ok(LineageRelationKind::DerivedFrom),
2211 "consolidated_from" => Ok(LineageRelationKind::ConsolidatedFrom),
2212 "supersedes" => Ok(LineageRelationKind::Supersedes),
2213 "superseded_by" => Ok(LineageRelationKind::SupersededBy),
2214 "conflicts_with" => Ok(LineageRelationKind::ConflictsWith),
2215 other => Err(invalid_argument(format!(
2216 "unknown lineage relation: {other}"
2217 ))),
2218 }
2219}
2220
2221fn lineage_relation_to_proto(value: LineageRelationKind) -> String {
2222 match value {
2223 LineageRelationKind::DerivedFrom => "derived_from",
2224 LineageRelationKind::ConsolidatedFrom => "consolidated_from",
2225 LineageRelationKind::Supersedes => "supersedes",
2226 LineageRelationKind::SupersededBy => "superseded_by",
2227 LineageRelationKind::ConflictsWith => "conflicts_with",
2228 }
2229 .to_string()
2230}
2231
2232fn lineage_link_from_proto(value: ProtoLineageLink) -> Result<LineageLink, Status> {
2233 Ok(LineageLink {
2234 record_id: value.record_id,
2235 relation: lineage_relation_from_proto(&value.relation)?,
2236 confidence: value.confidence,
2237 })
2238}
2239
2240fn lineage_link_to_proto(value: LineageLink) -> ProtoLineageLink {
2241 ProtoLineageLink {
2242 record_id: value.record_id,
2243 relation: lineage_relation_to_proto(value.relation),
2244 confidence: value.confidence,
2245 }
2246}
2247
2248fn conflict_review_state_from_proto(value: &str) -> Result<ConflictReviewState, Status> {
2249 match value {
2250 "" | "none" => Ok(ConflictReviewState::None),
2251 "potential_conflict" => Ok(ConflictReviewState::PotentialConflict),
2252 "under_review" => Ok(ConflictReviewState::UnderReview),
2253 "resolved" => Ok(ConflictReviewState::Resolved),
2254 "dismissed" => Ok(ConflictReviewState::Dismissed),
2255 other => Err(invalid_argument(format!(
2256 "unknown conflict review state: {other}"
2257 ))),
2258 }
2259}
2260
2261fn conflict_review_state_to_proto(value: ConflictReviewState) -> String {
2262 match value {
2263 ConflictReviewState::None => "none",
2264 ConflictReviewState::PotentialConflict => "potential_conflict",
2265 ConflictReviewState::UnderReview => "under_review",
2266 ConflictReviewState::Resolved => "resolved",
2267 ConflictReviewState::Dismissed => "dismissed",
2268 }
2269 .to_string()
2270}
2271
2272fn conflict_resolution_kind_from_proto(value: &str) -> Result<ConflictResolutionKind, Status> {
2273 match value {
2274 "" | "none" => Ok(ConflictResolutionKind::None),
2275 "accepted" => Ok(ConflictResolutionKind::Accepted),
2276 "rejected" => Ok(ConflictResolutionKind::Rejected),
2277 "superseded" => Ok(ConflictResolutionKind::Superseded),
2278 "merged" => Ok(ConflictResolutionKind::Merged),
2279 other => Err(invalid_argument(format!(
2280 "unknown conflict resolution kind: {other}"
2281 ))),
2282 }
2283}
2284
2285fn conflict_resolution_kind_to_proto(value: ConflictResolutionKind) -> String {
2286 match value {
2287 ConflictResolutionKind::None => "none",
2288 ConflictResolutionKind::Accepted => "accepted",
2289 ConflictResolutionKind::Rejected => "rejected",
2290 ConflictResolutionKind::Superseded => "superseded",
2291 ConflictResolutionKind::Merged => "merged",
2292 }
2293 .to_string()
2294}
2295
2296fn conflict_annotation_from_proto(
2297 value: ProtoConflictAnnotation,
2298) -> Result<ConflictAnnotation, Status> {
2299 Ok(ConflictAnnotation {
2300 state: conflict_review_state_from_proto(&value.state)?,
2301 conflicting_record_ids: value.conflicting_record_ids,
2302 drift_score: value.drift_score,
2303 resolution: conflict_resolution_kind_from_proto(&value.resolution)?,
2304 resolved_by: value.resolved_by,
2305 resolved_at_unix_ms: value.resolved_at_unix_ms,
2306 note: value.note,
2307 })
2308}
2309
2310fn conflict_annotation_to_proto(value: ConflictAnnotation) -> ProtoConflictAnnotation {
2311 ProtoConflictAnnotation {
2312 state: conflict_review_state_to_proto(value.state),
2313 conflicting_record_ids: value.conflicting_record_ids,
2314 drift_score: value.drift_score,
2315 resolution: conflict_resolution_kind_to_proto(value.resolution),
2316 resolved_by: value.resolved_by,
2317 resolved_at_unix_ms: value.resolved_at_unix_ms,
2318 note: value.note,
2319 }
2320}
2321
2322fn historical_mode_from_proto(value: &str) -> Result<RecallHistoricalMode, Status> {
2323 match value {
2324 "" | "current_only" => Ok(RecallHistoricalMode::CurrentOnly),
2325 "include_historical" => Ok(RecallHistoricalMode::IncludeHistorical),
2326 "historical_only" => Ok(RecallHistoricalMode::HistoricalOnly),
2327 other => Err(invalid_argument(format!(
2328 "unknown historical mode: {other}"
2329 ))),
2330 }
2331}
2332
2333fn episode_context_from_proto(value: ProtoEpisodeContext) -> Result<EpisodeContext, Status> {
2334 Ok(EpisodeContext {
2335 schema_version: if value.schema_version == 0 {
2336 EPISODE_SCHEMA_VERSION
2337 } else {
2338 value.schema_version
2339 },
2340 episode_id: value.episode_id,
2341 summary: value.summary,
2342 continuity_state: continuity_state_from_proto(&value.continuity_state)?,
2343 actor_ids: value.actor_ids,
2344 goal: value.goal,
2345 outcome: value.outcome,
2346 started_at_unix_ms: value.started_at_unix_ms,
2347 ended_at_unix_ms: value.ended_at_unix_ms,
2348 last_active_unix_ms: value.last_active_unix_ms,
2349 recurrence_key: value.recurrence_key,
2350 recurrence_interval_ms: value.recurrence_interval_ms,
2351 boundary_label: value.boundary_label,
2352 previous_record_id: value.previous_record_id,
2353 next_record_id: value.next_record_id,
2354 causal_record_ids: value.causal_record_ids,
2355 related_record_ids: value.related_record_ids,
2356 linked_artifact_uris: value.linked_artifact_uris,
2357 salience: value
2358 .salience
2359 .map_or_else(EpisodeSalience::default, |salience| EpisodeSalience {
2360 reuse_count: salience.reuse_count,
2361 novelty_score: salience.novelty_score,
2362 goal_relevance: salience.goal_relevance,
2363 unresolved_weight: salience.unresolved_weight,
2364 }),
2365 affective: value
2366 .affective
2367 .map(affective_annotation_from_proto)
2368 .transpose()?,
2369 })
2370}
2371
2372fn episode_context_to_proto(value: EpisodeContext) -> ProtoEpisodeContext {
2373 ProtoEpisodeContext {
2374 schema_version: value.schema_version,
2375 episode_id: value.episode_id,
2376 summary: value.summary,
2377 continuity_state: continuity_state_to_proto(value.continuity_state),
2378 actor_ids: value.actor_ids,
2379 goal: value.goal,
2380 outcome: value.outcome,
2381 started_at_unix_ms: value.started_at_unix_ms,
2382 ended_at_unix_ms: value.ended_at_unix_ms,
2383 last_active_unix_ms: value.last_active_unix_ms,
2384 recurrence_key: value.recurrence_key,
2385 recurrence_interval_ms: value.recurrence_interval_ms,
2386 boundary_label: value.boundary_label,
2387 previous_record_id: value.previous_record_id,
2388 next_record_id: value.next_record_id,
2389 causal_record_ids: value.causal_record_ids,
2390 related_record_ids: value.related_record_ids,
2391 linked_artifact_uris: value.linked_artifact_uris,
2392 salience: Some(ProtoEpisodeSalience {
2393 reuse_count: value.salience.reuse_count,
2394 novelty_score: value.salience.novelty_score,
2395 goal_relevance: value.salience.goal_relevance,
2396 unresolved_weight: value.salience.unresolved_weight,
2397 }),
2398 affective: value.affective.map(affective_annotation_to_proto),
2399 }
2400}
2401
2402fn record_from_proto(record: ProtoMemoryRecord) -> Result<MemoryRecord, Status> {
2403 Ok(MemoryRecord {
2404 id: record.id,
2405 scope: scope_from_proto(
2406 record
2407 .scope
2408 .ok_or_else(|| invalid_argument("memory record scope is required"))?,
2409 )?,
2410 kind: record_kind_from_proto(&record.kind)?,
2411 content: record.content,
2412 summary: record.summary,
2413 source_id: record.source_id,
2414 metadata: record.metadata.into_iter().collect(),
2415 quality_state: quality_state_from_proto(&record.quality_state)?,
2416 created_at_unix_ms: record.created_at_unix_ms,
2417 updated_at_unix_ms: record.updated_at_unix_ms,
2418 expires_at_unix_ms: record.expires_at_unix_ms,
2419 importance_score: record.importance_score,
2420 artifact: record.artifact.map(artifact_from_proto),
2421 episode: record.episode.map(episode_context_from_proto).transpose()?,
2422 historical_state: historical_state_from_proto(
2423 record.historical_state.as_deref().unwrap_or(""),
2424 )?,
2425 lineage: record
2426 .lineage
2427 .into_iter()
2428 .map(lineage_link_from_proto)
2429 .collect::<Result<Vec<_>, _>>()?,
2430 conflict: record
2431 .conflict
2432 .map(conflict_annotation_from_proto)
2433 .transpose()?,
2434 })
2435}
2436
2437fn record_to_proto(record: MemoryRecord) -> ProtoMemoryRecord {
2438 ProtoMemoryRecord {
2439 id: record.id,
2440 scope: Some(scope_to_proto(record.scope)),
2441 kind: record_kind_to_proto(record.kind),
2442 content: record.content,
2443 summary: record.summary,
2444 metadata: record.metadata.into_iter().collect(),
2445 quality_state: quality_state_to_proto(record.quality_state),
2446 created_at_unix_ms: record.created_at_unix_ms,
2447 updated_at_unix_ms: record.updated_at_unix_ms,
2448 expires_at_unix_ms: record.expires_at_unix_ms,
2449 importance_score: record.importance_score,
2450 source_id: record.source_id,
2451 artifact: record.artifact.map(artifact_to_proto),
2452 episode: record.episode.map(episode_context_to_proto),
2453 historical_state: Some(historical_state_to_proto(record.historical_state)),
2454 lineage: record
2455 .lineage
2456 .into_iter()
2457 .map(lineage_link_to_proto)
2458 .collect(),
2459 conflict: record.conflict.map(conflict_annotation_to_proto),
2460 }
2461}
2462
2463fn recall_explanation_to_proto(value: RecallExplanation) -> ProtoRecallExplanation {
2464 ProtoRecallExplanation {
2465 selected_channels: value.selected_channels,
2466 policy_notes: value.policy_notes,
2467 trace_id: value.trace_id,
2468 planning_trace: value.planning_trace.map(recall_planning_trace_to_proto),
2469 scorer_kind: value
2470 .scorer_kind
2471 .map(recall_scorer_kind_to_proto)
2472 .unwrap_or(ProtoRecallScorerKind::Unspecified as i32),
2473 scoring_profile: value
2474 .scoring_profile
2475 .map(recall_scoring_profile_to_proto)
2476 .unwrap_or(ProtoRecallScoringProfile::Unspecified as i32),
2477 planning_profile: value.planning_profile.map(planning_profile_to_proto),
2478 policy_profile: value
2479 .policy_profile
2480 .map(recall_policy_profile_to_proto)
2481 .unwrap_or(ProtoRecallPolicyProfile::Unspecified as i32),
2482 }
2483}
2484
2485fn recall_scorer_kind_to_proto(value: RecallScorerKind) -> i32 {
2486 match value {
2487 RecallScorerKind::Profile => ProtoRecallScorerKind::Profile as i32,
2488 RecallScorerKind::Curated => ProtoRecallScorerKind::Curated as i32,
2489 }
2490}
2491
2492fn recall_scoring_profile_to_proto(value: RecallScoringProfile) -> i32 {
2493 match value {
2494 RecallScoringProfile::Balanced => ProtoRecallScoringProfile::Balanced as i32,
2495 RecallScoringProfile::LexicalFirst => ProtoRecallScoringProfile::LexicalFirst as i32,
2496 RecallScoringProfile::ImportanceFirst => ProtoRecallScoringProfile::ImportanceFirst as i32,
2497 }
2498}
2499
2500fn recall_policy_profile_to_proto(value: RecallPolicyProfile) -> i32 {
2501 match value {
2502 RecallPolicyProfile::General => ProtoRecallPolicyProfile::General as i32,
2503 RecallPolicyProfile::Support => ProtoRecallPolicyProfile::Support as i32,
2504 RecallPolicyProfile::Research => ProtoRecallPolicyProfile::Research as i32,
2505 RecallPolicyProfile::Assistant => ProtoRecallPolicyProfile::Assistant as i32,
2506 RecallPolicyProfile::AutonomousAgent => ProtoRecallPolicyProfile::AutonomousAgent as i32,
2507 }
2508}
2509
2510fn embedding_provider_kind_to_proto(value: EmbeddingProviderKind) -> i32 {
2511 match value {
2512 EmbeddingProviderKind::Disabled => ProtoEmbeddingProviderKind::Disabled as i32,
2513 EmbeddingProviderKind::DeterministicLocal => {
2514 ProtoEmbeddingProviderKind::DeterministicLocal as i32
2515 }
2516 }
2517}
2518
2519fn engine_tuning_info_to_proto(value: EngineTuningInfo) -> ProtoEngineTuningInfo {
2520 ProtoEngineTuningInfo {
2521 recall_scorer_kind: recall_scorer_kind_to_proto(value.recall_scorer_kind),
2522 recall_scoring_profile: recall_scoring_profile_to_proto(value.recall_scoring_profile),
2523 embedding_provider_kind: embedding_provider_kind_to_proto(value.embedding_provider_kind),
2524 embedding_dimensions: value.embedding_dimensions as u64,
2525 graph_expansion_max_hops: u32::from(value.graph_expansion_max_hops),
2526 compaction_summarize_after_record_count: value.compaction_summarize_after_record_count
2527 as u64,
2528 compaction_cold_archive_after_days: value.compaction_cold_archive_after_days,
2529 compaction_cold_archive_importance_threshold_per_mille: u32::from(
2530 value.compaction_cold_archive_importance_threshold_per_mille,
2531 ),
2532 recall_planning_profile: Some(planning_profile_to_proto(value.recall_planning_profile)),
2533 recall_policy_profile: recall_policy_profile_to_proto(value.recall_policy_profile),
2534 }
2535}
2536
2537fn engine_tuning_info_from_proto(value: ProtoEngineTuningInfo) -> Result<EngineTuningInfo, Status> {
2538 Ok(EngineTuningInfo {
2539 recall_scorer_kind: match ProtoRecallScorerKind::try_from(value.recall_scorer_kind)
2540 .unwrap_or(ProtoRecallScorerKind::Unspecified)
2541 {
2542 ProtoRecallScorerKind::Profile => RecallScorerKind::Profile,
2543 ProtoRecallScorerKind::Curated => RecallScorerKind::Curated,
2544 ProtoRecallScorerKind::Unspecified => {
2545 return Err(invalid_argument("engine recall scorer kind is required"));
2546 }
2547 },
2548 recall_scoring_profile: match ProtoRecallScoringProfile::try_from(
2549 value.recall_scoring_profile,
2550 )
2551 .unwrap_or(ProtoRecallScoringProfile::Unspecified)
2552 {
2553 ProtoRecallScoringProfile::Balanced => RecallScoringProfile::Balanced,
2554 ProtoRecallScoringProfile::LexicalFirst => RecallScoringProfile::LexicalFirst,
2555 ProtoRecallScoringProfile::ImportanceFirst => RecallScoringProfile::ImportanceFirst,
2556 ProtoRecallScoringProfile::Unspecified => {
2557 return Err(invalid_argument(
2558 "engine recall scoring profile is required",
2559 ));
2560 }
2561 },
2562 recall_planning_profile: planning_profile_from_proto(
2563 value.recall_planning_profile.as_deref().unwrap_or(""),
2564 )?,
2565 recall_policy_profile: match ProtoRecallPolicyProfile::try_from(value.recall_policy_profile)
2566 .unwrap_or(ProtoRecallPolicyProfile::Unspecified)
2567 {
2568 ProtoRecallPolicyProfile::General => RecallPolicyProfile::General,
2569 ProtoRecallPolicyProfile::Support => RecallPolicyProfile::Support,
2570 ProtoRecallPolicyProfile::Research => RecallPolicyProfile::Research,
2571 ProtoRecallPolicyProfile::Assistant => RecallPolicyProfile::Assistant,
2572 ProtoRecallPolicyProfile::AutonomousAgent => RecallPolicyProfile::AutonomousAgent,
2573 ProtoRecallPolicyProfile::Unspecified => RecallPolicyProfile::General,
2574 },
2575 embedding_provider_kind: match ProtoEmbeddingProviderKind::try_from(
2576 value.embedding_provider_kind,
2577 )
2578 .unwrap_or(ProtoEmbeddingProviderKind::Unspecified)
2579 {
2580 ProtoEmbeddingProviderKind::Disabled => EmbeddingProviderKind::Disabled,
2581 ProtoEmbeddingProviderKind::DeterministicLocal => {
2582 EmbeddingProviderKind::DeterministicLocal
2583 }
2584 ProtoEmbeddingProviderKind::Unspecified => {
2585 return Err(invalid_argument(
2586 "engine embedding provider kind is required",
2587 ));
2588 }
2589 },
2590 embedding_dimensions: value.embedding_dimensions as usize,
2591 graph_expansion_max_hops: value.graph_expansion_max_hops as u8,
2592 compaction_summarize_after_record_count: value.compaction_summarize_after_record_count
2593 as usize,
2594 compaction_cold_archive_after_days: value.compaction_cold_archive_after_days,
2595 compaction_cold_archive_importance_threshold_per_mille: value
2596 .compaction_cold_archive_importance_threshold_per_mille
2597 as u16,
2598 })
2599}
2600
2601fn recall_planning_trace_to_proto(value: RecallPlanningTrace) -> ProtoRecallPlanningTrace {
2602 ProtoRecallPlanningTrace {
2603 trace_id: value.trace_id,
2604 token_budget_applied: value.token_budget_applied,
2605 candidates: value
2606 .candidates
2607 .into_iter()
2608 .map(recall_trace_candidate_to_proto)
2609 .collect(),
2610 }
2611}
2612
2613fn recall_trace_candidate_to_proto(value: RecallTraceCandidate) -> ProtoRecallTraceCandidate {
2614 ProtoRecallTraceCandidate {
2615 record_id: value.record_id,
2616 kind: record_kind_to_proto(value.kind),
2617 selected: value.selected,
2618 matched_terms: value.matched_terms,
2619 decision_reason: value.decision_reason,
2620 breakdown: Some(recall_score_breakdown_to_proto(value.breakdown)),
2621 selection_rank: value.selection_rank,
2622 selected_channels: value.selected_channels,
2623 filter_reasons: value.filter_reasons,
2624 candidate_sources: value
2625 .candidate_sources
2626 .into_iter()
2627 .map(candidate_source_to_proto)
2628 .collect(),
2629 planner_stage: Some(planner_stage_to_proto(value.planner_stage)),
2630 }
2631}
2632
2633fn recall_score_breakdown_to_proto(
2634 value: mnemara_core::RecallScoreBreakdown,
2635) -> ProtoRecallScoreBreakdown {
2636 ProtoRecallScoreBreakdown {
2637 lexical: value.lexical,
2638 semantic: value.semantic,
2639 graph: value.graph,
2640 temporal: value.temporal,
2641 policy: value.policy,
2642 total: value.total,
2643 metadata: value.metadata,
2644 curation: value.curation,
2645 episodic: value.episodic,
2646 salience: value.salience,
2647 }
2648}
2649
2650fn namespace_stats_to_proto(value: NamespaceStats) -> ProtoNamespaceStats {
2651 ProtoNamespaceStats {
2652 tenant_id: value.tenant_id,
2653 namespace: value.namespace,
2654 active_records: value.active_records,
2655 archived_records: value.archived_records,
2656 deleted_records: value.deleted_records,
2657 suppressed_records: value.suppressed_records,
2658 pinned_records: value.pinned_records,
2659 }
2660}
2661
2662fn maintenance_stats_to_proto(value: MaintenanceStats) -> ProtoMaintenanceStats {
2663 ProtoMaintenanceStats {
2664 duplicate_candidate_groups: value.duplicate_candidate_groups,
2665 duplicate_candidate_records: value.duplicate_candidate_records,
2666 tombstoned_records: value.tombstoned_records,
2667 expired_records: value.expired_records,
2668 stale_idempotency_keys: value.stale_idempotency_keys,
2669 historical_records: value.historical_records,
2670 superseded_records: value.superseded_records,
2671 lineage_links: value.lineage_links,
2672 }
2673}
2674
2675fn portable_record_to_proto(value: PortableRecord) -> ProtoPortableRecord {
2676 ProtoPortableRecord {
2677 record: Some(record_to_proto(value.record)),
2678 idempotency_key: value.idempotency_key,
2679 }
2680}
2681
2682fn snapshot_manifest_to_proto(value: SnapshotManifest) -> SnapshotReply {
2683 SnapshotReply {
2684 snapshot_id: value.snapshot_id,
2685 created_at_unix_ms: value.created_at_unix_ms,
2686 namespaces: value.namespaces,
2687 record_count: value.record_count,
2688 storage_bytes: value.storage_bytes,
2689 engine: Some(engine_tuning_info_to_proto(value.engine)),
2690 }
2691}
2692
2693fn portable_package_to_proto(value: PortableStorePackage) -> ProtoExportReply {
2694 ProtoExportReply {
2695 package_version: value.package_version,
2696 exported_at_unix_ms: value.exported_at_unix_ms,
2697 manifest: Some(snapshot_manifest_to_proto(value.manifest)),
2698 records: value
2699 .records
2700 .into_iter()
2701 .map(portable_record_to_proto)
2702 .collect(),
2703 }
2704}
2705
2706fn import_mode_from_proto(value: i32) -> Result<ImportMode, Status> {
2707 match ProtoImportMode::try_from(value).unwrap_or(ProtoImportMode::Unspecified) {
2708 ProtoImportMode::Validate => Ok(ImportMode::Validate),
2709 ProtoImportMode::Merge => Ok(ImportMode::Merge),
2710 ProtoImportMode::Replace => Ok(ImportMode::Replace),
2711 ProtoImportMode::Unspecified => Err(invalid_argument("import mode is required")),
2712 }
2713}
2714
2715fn portable_record_from_proto(value: ProtoPortableRecord) -> Result<PortableRecord, Status> {
2716 Ok(PortableRecord {
2717 record: record_from_proto(
2718 value
2719 .record
2720 .ok_or_else(|| invalid_argument("portable record payload is required"))?,
2721 )?,
2722 idempotency_key: value.idempotency_key,
2723 })
2724}
2725
2726fn snapshot_manifest_from_proto(value: SnapshotReply) -> Result<SnapshotManifest, Status> {
2727 Ok(SnapshotManifest {
2728 snapshot_id: value.snapshot_id,
2729 created_at_unix_ms: value.created_at_unix_ms,
2730 namespaces: value.namespaces,
2731 record_count: value.record_count,
2732 storage_bytes: value.storage_bytes,
2733 engine: value
2734 .engine
2735 .map(engine_tuning_info_from_proto)
2736 .transpose()?
2737 .ok_or_else(|| invalid_argument("snapshot engine tuning info is required"))?,
2738 })
2739}
2740
2741fn portable_package_from_proto(value: ProtoExportReply) -> Result<PortableStorePackage, Status> {
2742 Ok(PortableStorePackage {
2743 package_version: value.package_version,
2744 exported_at_unix_ms: value.exported_at_unix_ms,
2745 manifest: snapshot_manifest_from_proto(
2746 value
2747 .manifest
2748 .ok_or_else(|| invalid_argument("portable export manifest is required"))?,
2749 )?,
2750 records: value
2751 .records
2752 .into_iter()
2753 .map(portable_record_from_proto)
2754 .collect::<Result<Vec<_>, _>>()?,
2755 })
2756}
2757
2758fn trace_operation_kind_to_proto(value: TraceOperationKind) -> i32 {
2759 match value {
2760 TraceOperationKind::Upsert => ProtoTraceOperationKind::Upsert as i32,
2761 TraceOperationKind::BatchUpsert => ProtoTraceOperationKind::BatchUpsert as i32,
2762 TraceOperationKind::Recall => ProtoTraceOperationKind::Recall as i32,
2763 TraceOperationKind::Snapshot => ProtoTraceOperationKind::Snapshot as i32,
2764 TraceOperationKind::Stats => ProtoTraceOperationKind::Stats as i32,
2765 TraceOperationKind::IntegrityCheck => ProtoTraceOperationKind::IntegrityCheck as i32,
2766 TraceOperationKind::Repair => ProtoTraceOperationKind::Repair as i32,
2767 TraceOperationKind::Compact => ProtoTraceOperationKind::Compact as i32,
2768 TraceOperationKind::Delete => ProtoTraceOperationKind::Delete as i32,
2769 TraceOperationKind::Archive => ProtoTraceOperationKind::Archive as i32,
2770 TraceOperationKind::Suppress => ProtoTraceOperationKind::Suppress as i32,
2771 TraceOperationKind::Recover => ProtoTraceOperationKind::Recover as i32,
2772 TraceOperationKind::Export => ProtoTraceOperationKind::Export as i32,
2773 TraceOperationKind::Import => ProtoTraceOperationKind::Import as i32,
2774 }
2775}
2776
2777fn trace_operation_kind_from_proto(value: ProtoTraceOperationKind) -> Option<TraceOperationKind> {
2778 match value {
2779 ProtoTraceOperationKind::Upsert => Some(TraceOperationKind::Upsert),
2780 ProtoTraceOperationKind::BatchUpsert => Some(TraceOperationKind::BatchUpsert),
2781 ProtoTraceOperationKind::Recall => Some(TraceOperationKind::Recall),
2782 ProtoTraceOperationKind::Snapshot => Some(TraceOperationKind::Snapshot),
2783 ProtoTraceOperationKind::Stats => Some(TraceOperationKind::Stats),
2784 ProtoTraceOperationKind::IntegrityCheck => Some(TraceOperationKind::IntegrityCheck),
2785 ProtoTraceOperationKind::Repair => Some(TraceOperationKind::Repair),
2786 ProtoTraceOperationKind::Compact => Some(TraceOperationKind::Compact),
2787 ProtoTraceOperationKind::Delete => Some(TraceOperationKind::Delete),
2788 ProtoTraceOperationKind::Archive => Some(TraceOperationKind::Archive),
2789 ProtoTraceOperationKind::Suppress => Some(TraceOperationKind::Suppress),
2790 ProtoTraceOperationKind::Recover => Some(TraceOperationKind::Recover),
2791 ProtoTraceOperationKind::Export => Some(TraceOperationKind::Export),
2792 ProtoTraceOperationKind::Import => Some(TraceOperationKind::Import),
2793 ProtoTraceOperationKind::Unspecified => None,
2794 }
2795}
2796
2797fn trace_status_to_proto(value: TraceStatus) -> i32 {
2798 match value {
2799 TraceStatus::Ok => ProtoTraceStatus::Ok as i32,
2800 TraceStatus::Rejected => ProtoTraceStatus::Rejected as i32,
2801 TraceStatus::Error => ProtoTraceStatus::Error as i32,
2802 }
2803}
2804
2805fn trace_status_from_proto(value: ProtoTraceStatus) -> Option<TraceStatus> {
2806 match value {
2807 ProtoTraceStatus::Ok => Some(TraceStatus::Ok),
2808 ProtoTraceStatus::Rejected => Some(TraceStatus::Rejected),
2809 ProtoTraceStatus::Error => Some(TraceStatus::Error),
2810 ProtoTraceStatus::Unspecified => None,
2811 }
2812}
2813
2814fn operation_trace_to_proto(value: OperationTrace) -> ProtoOperationTrace {
2815 ProtoOperationTrace {
2816 trace_id: value.trace_id,
2817 correlation_id: value.correlation_id,
2818 operation: trace_operation_kind_to_proto(value.operation),
2819 transport: value.transport,
2820 backend: value.backend,
2821 admission_class: value.admission_class,
2822 tenant_id: value.tenant_id,
2823 namespace: value.namespace,
2824 principal: value.principal,
2825 store_span_id: value.store_span_id,
2826 planning_trace_id: value.planning_trace_id,
2827 started_at_unix_ms: value.started_at_unix_ms,
2828 completed_at_unix_ms: value.completed_at_unix_ms,
2829 latency_ms: value.latency_ms,
2830 status: trace_status_to_proto(value.status),
2831 status_message: value.status_message,
2832 summary: Some(ProtoOperationTraceSummary {
2833 record_id: value.summary.record_id,
2834 request_count: value.summary.request_count,
2835 query_text: value.summary.query_text,
2836 max_items: value.summary.max_items,
2837 token_budget: value.summary.token_budget,
2838 dry_run: value.summary.dry_run,
2839 }),
2840 recall_explanation: value.recall_explanation.map(recall_explanation_to_proto),
2841 }
2842}
2843
2844fn kinds_from_proto(values: Vec<String>) -> Result<Vec<MemoryRecordKind>, Status> {
2845 values
2846 .into_iter()
2847 .map(|value| record_kind_from_proto(&value))
2848 .collect()
2849}
2850
2851fn trust_levels_from_proto(values: Vec<String>) -> Result<Vec<MemoryTrustLevel>, Status> {
2852 values
2853 .into_iter()
2854 .map(|value| trust_level_from_proto(&value))
2855 .collect()
2856}
2857
2858fn quality_states_from_proto(values: Vec<String>) -> Result<Vec<MemoryQualityState>, Status> {
2859 values
2860 .into_iter()
2861 .map(|value| quality_state_from_proto(&value))
2862 .collect()
2863}
2864
2865fn conflict_states_from_proto(values: Vec<String>) -> Result<Vec<ConflictReviewState>, Status> {
2866 values
2867 .into_iter()
2868 .map(|value| conflict_review_state_from_proto(&value))
2869 .collect()
2870}
2871
2872fn resolution_kinds_from_proto(values: Vec<String>) -> Result<Vec<ConflictResolutionKind>, Status> {
2873 values
2874 .into_iter()
2875 .map(|value| conflict_resolution_kind_from_proto(&value))
2876 .collect()
2877}
2878
2879fn recall_filters_from_proto(filters: Option<ProtoRecallFilters>) -> Result<RecallFilters, Status> {
2880 let Some(filters) = filters else {
2881 return Ok(RecallFilters::default());
2882 };
2883
2884 Ok(RecallFilters {
2885 kinds: kinds_from_proto(filters.kinds)?,
2886 required_labels: filters.required_labels,
2887 source: filters.source,
2888 from_unix_ms: filters.from_unix_ms,
2889 to_unix_ms: filters.to_unix_ms,
2890 min_importance_score: filters.min_importance_score,
2891 trust_levels: trust_levels_from_proto(filters.trust_levels)?,
2892 states: quality_states_from_proto(filters.states)?,
2893 include_archived: filters.include_archived,
2894 episode_id: filters.episode_id,
2895 continuity_states: filters
2896 .continuity_states
2897 .into_iter()
2898 .map(|value| continuity_state_from_proto(&value))
2899 .collect::<Result<Vec<_>, _>>()?,
2900 unresolved_only: filters.unresolved_only,
2901 temporal_order: temporal_order_from_proto(filters.temporal_order.as_deref().unwrap_or(""))?,
2902 historical_mode: historical_mode_from_proto(
2903 filters.historical_mode.as_deref().unwrap_or(""),
2904 )?,
2905 lineage_record_id: filters.lineage_record_id,
2906 before_record_id: filters.before_record_id,
2907 after_record_id: filters.after_record_id,
2908 boundary_labels: filters.boundary_labels,
2909 recurrence_key: filters.recurrence_key,
2910 conflict_states: conflict_states_from_proto(filters.conflict_states)?,
2911 resolution_kinds: resolution_kinds_from_proto(filters.resolution_kinds)?,
2912 unresolved_conflicts_only: filters.unresolved_conflicts_only,
2913 })
2914}
2915
2916fn validate_scope_limits(scope: &MemoryScope, limits: &ServerLimits) -> Result<(), Status> {
2917 if scope.labels.len() > limits.max_labels_per_scope {
2918 return Err(invalid_argument(format!(
2919 "scope labels exceed configured max of {}",
2920 limits.max_labels_per_scope
2921 )));
2922 }
2923 Ok(())
2924}
2925
2926fn validate_record_limits(record: &MemoryRecord, limits: &ServerLimits) -> Result<(), Status> {
2927 validate_scope_limits(&record.scope, limits)?;
2928 if record.content.len() > limits.max_record_content_bytes {
2929 return Err(invalid_argument(format!(
2930 "record content exceeds configured max of {} bytes",
2931 limits.max_record_content_bytes
2932 )));
2933 }
2934 if record.summary.as_deref().map(str::len).unwrap_or(0) > limits.max_query_text_bytes {
2935 return Err(invalid_argument(format!(
2936 "record summary exceeds configured max of {} bytes",
2937 limits.max_query_text_bytes
2938 )));
2939 }
2940 Ok(())
2941}
2942
2943fn validate_recall_limits(query: &RecallQuery, limits: &ServerLimits) -> Result<(), Status> {
2944 validate_scope_limits(&query.scope, limits)?;
2945 if query.max_items == 0 {
2946 return Err(invalid_argument("recall max_items must be greater than 0"));
2947 }
2948 if query.max_items > limits.max_recall_items {
2949 return Err(invalid_argument(format!(
2950 "recall max_items exceeds configured max of {}",
2951 limits.max_recall_items
2952 )));
2953 }
2954 if query.query_text.len() > limits.max_query_text_bytes {
2955 return Err(invalid_argument(format!(
2956 "query text exceeds configured max of {} bytes",
2957 limits.max_query_text_bytes
2958 )));
2959 }
2960 Ok(())
2961}
2962
2963fn map_store_error(error: mnemara_core::Error) -> Status {
2964 match error {
2965 mnemara_core::Error::InvalidConfig(message)
2966 | mnemara_core::Error::InvalidRequest(message) => invalid_argument(message),
2967 mnemara_core::Error::Conflict(message) => Status::already_exists(message),
2968 mnemara_core::Error::Unsupported(message) => Status::unimplemented(message),
2969 mnemara_core::Error::Backend(message) => internal_status(message),
2970 }
2971}
2972
2973fn map_http_store_error(error: mnemara_core::Error) -> (StatusCode, Json<HttpErrorBody>) {
2974 let status = match error {
2975 mnemara_core::Error::InvalidConfig(_) | mnemara_core::Error::InvalidRequest(_) => {
2976 StatusCode::BAD_REQUEST
2977 }
2978 mnemara_core::Error::Conflict(_) => StatusCode::CONFLICT,
2979 mnemara_core::Error::Unsupported(_) => StatusCode::NOT_IMPLEMENTED,
2980 mnemara_core::Error::Backend(_) => StatusCode::INTERNAL_SERVER_ERROR,
2981 };
2982 (
2983 status,
2984 Json(HttpErrorBody {
2985 error: error.to_string(),
2986 }),
2987 )
2988}
2989
2990fn map_http_validation_error(status: Status) -> (StatusCode, Json<HttpErrorBody>) {
2991 (
2992 StatusCode::BAD_REQUEST,
2993 Json(HttpErrorBody {
2994 error: status.message().to_string(),
2995 }),
2996 )
2997}
2998
2999fn record_grpc_result<T>(
3000 metrics: &MethodMetrics,
3001 result: Result<Response<T>, Status>,
3002) -> Result<Response<T>, Status> {
3003 match result {
3004 Ok(response) => {
3005 metrics.record_status(&Status::ok(""));
3006 Ok(response)
3007 }
3008 Err(status) => {
3009 metrics.record_status(&status);
3010 Err(status)
3011 }
3012 }
3013}
3014
3015#[tonic::async_trait]
3016impl<S> MemoryService for GrpcMemoryService<S>
3017where
3018 S: MemoryStore + 'static,
3019{
3020 async fn upsert_memory_record(
3021 &self,
3022 request: Request<ProtoUpsertMemoryRecordRequest>,
3023 ) -> Result<Response<UpsertMemoryRecordReply>, Status> {
3024 self.runtime.metrics().grpc_upsert.record_started();
3025 validate_grpc_auth(
3026 &request,
3027 self.runtime.auth().as_ref(),
3028 AuthPermission::Write,
3029 )?;
3030 let principal = grpc_principal(&request);
3031 let started_at_unix_ms = now_unix_ms();
3032 let correlation_id = self.runtime.traces().next_id("corr");
3033 let result = async {
3034 let request = request.into_inner();
3035 let record = record_from_proto(
3036 request
3037 .record
3038 .ok_or_else(|| invalid_argument("memory record is required"))?,
3039 )?;
3040 let tenant_id = record.scope.tenant_id.clone();
3041 let namespace = record.scope.namespace.clone();
3042 validate_record_limits(&record, self.runtime.limits().as_ref())?;
3043 let _permit = self
3044 .runtime
3045 .admission()
3046 .acquire(AdmissionClass::Write, Some(&tenant_id))
3047 .await
3048 .map_err(|error| {
3049 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3050 })?;
3051 let receipt = self
3052 .store
3053 .upsert(mnemara_core::UpsertRequest {
3054 record,
3055 idempotency_key: request.idempotency_key,
3056 })
3057 .await
3058 .map_err(map_store_error)?;
3059 record_trace(
3060 &self.runtime,
3061 TraceOperationKind::Upsert,
3062 "grpc",
3063 Some(tenant_id),
3064 Some(namespace),
3065 principal,
3066 started_at_unix_ms,
3067 TraceStatus::Ok,
3068 None,
3069 OperationTraceSummary {
3070 record_id: Some(receipt.record_id.clone()),
3071 ..OperationTraceSummary::default()
3072 },
3073 None,
3074 correlation_id,
3075 );
3076
3077 Ok(Response::new(UpsertMemoryRecordReply {
3078 record_id: receipt.record_id,
3079 deduplicated: receipt.deduplicated,
3080 summary_refreshed: receipt.summary_refreshed,
3081 }))
3082 }
3083 .await;
3084 record_grpc_result(&self.runtime.metrics().grpc_upsert, result)
3085 }
3086
3087 async fn batch_upsert_memory_records(
3088 &self,
3089 request: Request<BatchUpsertMemoryRecordsRequest>,
3090 ) -> Result<Response<BatchUpsertMemoryRecordsReply>, Status> {
3091 self.runtime.metrics().grpc_batch_upsert.record_started();
3092 validate_grpc_auth(
3093 &request,
3094 self.runtime.auth().as_ref(),
3095 AuthPermission::Write,
3096 )?;
3097 let principal = grpc_principal(&request);
3098 let started_at_unix_ms = now_unix_ms();
3099 let correlation_id = self.runtime.traces().next_id("corr");
3100 let result = async {
3101 let request = request.into_inner();
3102 if request.requests.len() > self.runtime.limits().max_batch_upsert_requests {
3103 return Err(invalid_argument(format!(
3104 "batch upsert request count exceeds configured max of {}",
3105 self.runtime.limits().max_batch_upsert_requests
3106 )));
3107 }
3108 let requests = request
3109 .requests
3110 .into_iter()
3111 .map(|request| {
3112 let record = record_from_proto(
3113 request
3114 .record
3115 .ok_or_else(|| invalid_argument("memory record is required"))?,
3116 )?;
3117 validate_record_limits(&record, self.runtime.limits().as_ref())?;
3118 Ok(mnemara_core::UpsertRequest {
3119 record,
3120 idempotency_key: request.idempotency_key,
3121 })
3122 })
3123 .collect::<Result<Vec<_>, Status>>()?;
3124 let tenant_id = requests
3125 .first()
3126 .map(|item| item.record.scope.tenant_id.clone());
3127 let namespace = requests
3128 .first()
3129 .map(|item| item.record.scope.namespace.clone());
3130 let _permit = self
3131 .runtime
3132 .admission()
3133 .acquire(AdmissionClass::Write, tenant_id.as_deref())
3134 .await
3135 .map_err(|error| {
3136 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3137 })?;
3138
3139 let receipts: Vec<UpsertMemoryRecordReply> = self
3140 .store
3141 .batch_upsert(BatchUpsertRequest { requests })
3142 .await
3143 .map_err(map_store_error)?
3144 .into_iter()
3145 .map(|receipt| UpsertMemoryRecordReply {
3146 record_id: receipt.record_id,
3147 deduplicated: receipt.deduplicated,
3148 summary_refreshed: receipt.summary_refreshed,
3149 })
3150 .collect();
3151 record_trace(
3152 &self.runtime,
3153 TraceOperationKind::BatchUpsert,
3154 "grpc",
3155 tenant_id,
3156 namespace,
3157 principal,
3158 started_at_unix_ms,
3159 TraceStatus::Ok,
3160 None,
3161 OperationTraceSummary {
3162 request_count: Some(receipts.len() as u32),
3163 ..OperationTraceSummary::default()
3164 },
3165 None,
3166 correlation_id,
3167 );
3168
3169 Ok(Response::new(BatchUpsertMemoryRecordsReply { receipts }))
3170 }
3171 .await;
3172 record_grpc_result(&self.runtime.metrics().grpc_batch_upsert, result)
3173 }
3174
3175 async fn recall(
3176 &self,
3177 request: Request<ProtoRecallRequest>,
3178 ) -> Result<Response<RecallReply>, Status> {
3179 self.runtime.metrics().grpc_recall.record_started();
3180 validate_grpc_auth(&request, self.runtime.auth().as_ref(), AuthPermission::Read)?;
3181 let principal = grpc_principal(&request);
3182 let started_at_unix_ms = now_unix_ms();
3183 let correlation_id = self.runtime.traces().next_id("corr");
3184 let result = async {
3185 let request = request.into_inner();
3186 let query = RecallQuery {
3187 scope: scope_from_proto(
3188 request
3189 .scope
3190 .ok_or_else(|| invalid_argument("recall scope is required"))?,
3191 )?,
3192 query_text: request.query_text,
3193 max_items: request.max_items as usize,
3194 token_budget: request.token_budget.map(|value| value as usize),
3195 filters: recall_filters_from_proto(request.filters)?,
3196 include_explanation: request.include_explanation,
3197 };
3198 validate_recall_limits(&query, self.runtime.limits().as_ref())?;
3199 let tenant_id = query.scope.tenant_id.clone();
3200 let namespace = query.scope.namespace.clone();
3201 let query_text = query.query_text.clone();
3202 let max_items = query.max_items as u32;
3203 let token_budget = query.token_budget.map(|value| value as u32);
3204 let _permit = self
3205 .runtime
3206 .admission()
3207 .acquire(AdmissionClass::Read, Some(&tenant_id))
3208 .await
3209 .map_err(|error| {
3210 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3211 })?;
3212 let mut result = self.store.recall(query).await.map_err(map_store_error)?;
3213 attach_correlation_id(&mut result, &correlation_id);
3214 let recall_explanation = result.explanation.clone();
3215
3216 let hits = result
3217 .hits
3218 .into_iter()
3219 .map(|hit| ProtoRecallHit {
3220 record: Some(record_to_proto(hit.record)),
3221 breakdown: Some(recall_score_breakdown_to_proto(hit.breakdown)),
3222 selected_channels: hit
3223 .explanation
3224 .as_ref()
3225 .map(|value| value.selected_channels.clone())
3226 .unwrap_or_default(),
3227 policy_notes: hit
3228 .explanation
3229 .as_ref()
3230 .map(|value| value.policy_notes.clone())
3231 .unwrap_or_default(),
3232 explanation: hit.explanation.map(recall_explanation_to_proto),
3233 })
3234 .collect();
3235 record_trace(
3236 &self.runtime,
3237 TraceOperationKind::Recall,
3238 "grpc",
3239 Some(tenant_id),
3240 Some(namespace),
3241 principal,
3242 started_at_unix_ms,
3243 TraceStatus::Ok,
3244 None,
3245 OperationTraceSummary {
3246 query_text: Some(query_text),
3247 max_items: Some(max_items),
3248 token_budget,
3249 ..OperationTraceSummary::default()
3250 },
3251 recall_explanation,
3252 correlation_id,
3253 );
3254
3255 Ok(Response::new(RecallReply {
3256 hits,
3257 total_candidates_examined: result.total_candidates_examined as u32,
3258 explanation: result.explanation.map(recall_explanation_to_proto),
3259 }))
3260 }
3261 .await;
3262 record_grpc_result(&self.runtime.metrics().grpc_recall, result)
3263 }
3264
3265 async fn compact(
3266 &self,
3267 request: Request<ProtoCompactRequest>,
3268 ) -> Result<Response<CompactReply>, Status> {
3269 self.runtime.metrics().grpc_compact.record_started();
3270 validate_grpc_auth(
3271 &request,
3272 self.runtime.auth().as_ref(),
3273 AuthPermission::Admin,
3274 )?;
3275 let principal = grpc_principal(&request);
3276 let started_at_unix_ms = now_unix_ms();
3277 let correlation_id = self.runtime.traces().next_id("corr");
3278 let result = async {
3279 let request = request.into_inner();
3280 let tenant_id = request.tenant_id.clone();
3281 let namespace = request.namespace.clone();
3282 let _permit = self
3283 .runtime
3284 .admission()
3285 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3286 .await
3287 .map_err(|error| {
3288 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3289 })?;
3290 let report = self
3291 .store
3292 .compact(CompactionRequest {
3293 tenant_id,
3294 namespace: namespace.clone(),
3295 dry_run: request.dry_run,
3296 reason: request.reason,
3297 })
3298 .await
3299 .map_err(map_store_error)?;
3300 record_trace(
3301 &self.runtime,
3302 TraceOperationKind::Compact,
3303 "grpc",
3304 Some(request.tenant_id),
3305 namespace,
3306 principal,
3307 started_at_unix_ms,
3308 TraceStatus::Ok,
3309 None,
3310 OperationTraceSummary {
3311 dry_run: Some(request.dry_run),
3312 ..OperationTraceSummary::default()
3313 },
3314 None,
3315 correlation_id,
3316 );
3317
3318 Ok(Response::new(CompactReply {
3319 deduplicated_records: report.deduplicated_records,
3320 archived_records: report.archived_records,
3321 summarized_clusters: report.summarized_clusters,
3322 pruned_graph_edges: report.pruned_graph_edges,
3323 superseded_records: report.superseded_records,
3324 lineage_links_created: report.lineage_links_created,
3325 dry_run: report.dry_run,
3326 }))
3327 }
3328 .await;
3329 record_grpc_result(&self.runtime.metrics().grpc_compact, result)
3330 }
3331
3332 async fn snapshot(
3333 &self,
3334 request: Request<SnapshotRequest>,
3335 ) -> Result<Response<SnapshotReply>, Status> {
3336 self.runtime.metrics().grpc_snapshot.record_started();
3337 validate_grpc_auth(
3338 &request,
3339 self.runtime.auth().as_ref(),
3340 AuthPermission::Admin,
3341 )?;
3342 let principal = grpc_principal(&request);
3343 let started_at_unix_ms = now_unix_ms();
3344 let correlation_id = self.runtime.traces().next_id("corr");
3345 let result = async {
3346 let _permit = self
3347 .runtime
3348 .admission()
3349 .acquire(AdmissionClass::Admin, None)
3350 .await
3351 .map_err(|error| {
3352 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3353 })?;
3354 let manifest = self.store.snapshot().await.map_err(map_store_error)?;
3355 record_trace(
3356 &self.runtime,
3357 TraceOperationKind::Snapshot,
3358 "grpc",
3359 None,
3360 None,
3361 principal,
3362 started_at_unix_ms,
3363 TraceStatus::Ok,
3364 None,
3365 OperationTraceSummary::default(),
3366 None,
3367 correlation_id,
3368 );
3369 Ok(Response::new(SnapshotReply {
3370 snapshot_id: manifest.snapshot_id,
3371 created_at_unix_ms: manifest.created_at_unix_ms,
3372 namespaces: manifest.namespaces,
3373 record_count: manifest.record_count,
3374 storage_bytes: manifest.storage_bytes,
3375 engine: Some(engine_tuning_info_to_proto(manifest.engine)),
3376 }))
3377 }
3378 .await;
3379 record_grpc_result(&self.runtime.metrics().grpc_snapshot, result)
3380 }
3381
3382 async fn delete(
3383 &self,
3384 request: Request<ProtoDeleteRequest>,
3385 ) -> Result<Response<DeleteReply>, Status> {
3386 self.runtime.metrics().grpc_delete.record_started();
3387 validate_grpc_auth(
3388 &request,
3389 self.runtime.auth().as_ref(),
3390 AuthPermission::Admin,
3391 )?;
3392 let principal = grpc_principal(&request);
3393 let started_at_unix_ms = now_unix_ms();
3394 let correlation_id = self.runtime.traces().next_id("corr");
3395 let result = async {
3396 let request = request.into_inner();
3397 let tenant_id = request.tenant_id.clone();
3398 let namespace = request.namespace.clone();
3399 let _permit = self
3400 .runtime
3401 .admission()
3402 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3403 .await
3404 .map_err(|error| {
3405 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3406 })?;
3407 let receipt = self
3408 .store
3409 .delete(DeleteRequest {
3410 tenant_id,
3411 namespace: namespace.clone(),
3412 record_id: request.record_id,
3413 hard_delete: request.hard_delete,
3414 audit_reason: request.audit_reason,
3415 })
3416 .await
3417 .map_err(map_store_error)?;
3418 record_trace(
3419 &self.runtime,
3420 TraceOperationKind::Delete,
3421 "grpc",
3422 Some(request.tenant_id),
3423 Some(namespace),
3424 principal,
3425 started_at_unix_ms,
3426 TraceStatus::Ok,
3427 None,
3428 OperationTraceSummary {
3429 record_id: Some(receipt.record_id.clone()),
3430 ..OperationTraceSummary::default()
3431 },
3432 None,
3433 correlation_id,
3434 );
3435
3436 Ok(Response::new(DeleteReply {
3437 record_id: receipt.record_id,
3438 tombstoned: receipt.tombstoned,
3439 hard_deleted: receipt.hard_deleted,
3440 }))
3441 }
3442 .await;
3443 record_grpc_result(&self.runtime.metrics().grpc_delete, result)
3444 }
3445
3446 async fn archive(
3447 &self,
3448 request: Request<ProtoArchiveRequest>,
3449 ) -> Result<Response<ArchiveReply>, Status> {
3450 self.runtime.metrics().grpc_archive.record_started();
3451 validate_grpc_auth(
3452 &request,
3453 self.runtime.auth().as_ref(),
3454 AuthPermission::Admin,
3455 )?;
3456 let principal = grpc_principal(&request);
3457 let started_at_unix_ms = now_unix_ms();
3458 let correlation_id = self.runtime.traces().next_id("corr");
3459 let result = async {
3460 let request = request.into_inner();
3461 let tenant_id = request.tenant_id.clone();
3462 let namespace = request.namespace.clone();
3463 let _permit = self
3464 .runtime
3465 .admission()
3466 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3467 .await
3468 .map_err(|error| {
3469 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3470 })?;
3471 let receipt = self
3472 .store
3473 .archive(ArchiveRequest {
3474 tenant_id,
3475 namespace: namespace.clone(),
3476 record_id: request.record_id,
3477 dry_run: request.dry_run,
3478 audit_reason: request.audit_reason,
3479 })
3480 .await
3481 .map_err(map_store_error)?;
3482 record_trace(
3483 &self.runtime,
3484 TraceOperationKind::Archive,
3485 "grpc",
3486 Some(request.tenant_id),
3487 Some(namespace),
3488 principal,
3489 started_at_unix_ms,
3490 TraceStatus::Ok,
3491 None,
3492 OperationTraceSummary {
3493 record_id: Some(receipt.record_id.clone()),
3494 dry_run: Some(receipt.dry_run),
3495 ..OperationTraceSummary::default()
3496 },
3497 None,
3498 correlation_id,
3499 );
3500
3501 Ok(Response::new(ArchiveReply {
3502 record_id: receipt.record_id,
3503 previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3504 previous_historical_state: historical_state_to_proto(
3505 receipt.previous_historical_state,
3506 ),
3507 quality_state: quality_state_to_proto(receipt.quality_state),
3508 historical_state: historical_state_to_proto(receipt.historical_state),
3509 changed: receipt.changed,
3510 dry_run: receipt.dry_run,
3511 }))
3512 }
3513 .await;
3514 record_grpc_result(&self.runtime.metrics().grpc_archive, result)
3515 }
3516
3517 async fn suppress(
3518 &self,
3519 request: Request<ProtoSuppressRequest>,
3520 ) -> Result<Response<SuppressReply>, Status> {
3521 self.runtime.metrics().grpc_suppress.record_started();
3522 validate_grpc_auth(
3523 &request,
3524 self.runtime.auth().as_ref(),
3525 AuthPermission::Admin,
3526 )?;
3527 let principal = grpc_principal(&request);
3528 let started_at_unix_ms = now_unix_ms();
3529 let correlation_id = self.runtime.traces().next_id("corr");
3530 let result = async {
3531 let request = request.into_inner();
3532 let tenant_id = request.tenant_id.clone();
3533 let namespace = request.namespace.clone();
3534 let _permit = self
3535 .runtime
3536 .admission()
3537 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3538 .await
3539 .map_err(|error| {
3540 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3541 })?;
3542 let receipt = self
3543 .store
3544 .suppress(SuppressRequest {
3545 tenant_id,
3546 namespace: namespace.clone(),
3547 record_id: request.record_id,
3548 dry_run: request.dry_run,
3549 audit_reason: request.audit_reason,
3550 })
3551 .await
3552 .map_err(map_store_error)?;
3553 record_trace(
3554 &self.runtime,
3555 TraceOperationKind::Suppress,
3556 "grpc",
3557 Some(request.tenant_id),
3558 Some(namespace),
3559 principal,
3560 started_at_unix_ms,
3561 TraceStatus::Ok,
3562 None,
3563 OperationTraceSummary {
3564 record_id: Some(receipt.record_id.clone()),
3565 dry_run: Some(receipt.dry_run),
3566 ..OperationTraceSummary::default()
3567 },
3568 None,
3569 correlation_id,
3570 );
3571
3572 Ok(Response::new(SuppressReply {
3573 record_id: receipt.record_id,
3574 previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3575 previous_historical_state: historical_state_to_proto(
3576 receipt.previous_historical_state,
3577 ),
3578 quality_state: quality_state_to_proto(receipt.quality_state),
3579 historical_state: historical_state_to_proto(receipt.historical_state),
3580 changed: receipt.changed,
3581 dry_run: receipt.dry_run,
3582 }))
3583 }
3584 .await;
3585 record_grpc_result(&self.runtime.metrics().grpc_suppress, result)
3586 }
3587
3588 async fn recover(
3589 &self,
3590 request: Request<ProtoRecoverRequest>,
3591 ) -> Result<Response<RecoverReply>, Status> {
3592 self.runtime.metrics().grpc_recover.record_started();
3593 validate_grpc_auth(
3594 &request,
3595 self.runtime.auth().as_ref(),
3596 AuthPermission::Admin,
3597 )?;
3598 let principal = grpc_principal(&request);
3599 let started_at_unix_ms = now_unix_ms();
3600 let correlation_id = self.runtime.traces().next_id("corr");
3601 let result = async {
3602 let request = request.into_inner();
3603 let tenant_id = request.tenant_id.clone();
3604 let namespace = request.namespace.clone();
3605 let _permit = self
3606 .runtime
3607 .admission()
3608 .acquire(AdmissionClass::Admin, Some(&tenant_id))
3609 .await
3610 .map_err(|error| {
3611 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3612 })?;
3613 let receipt = self
3614 .store
3615 .recover(RecoverRequest {
3616 tenant_id,
3617 namespace: namespace.clone(),
3618 record_id: request.record_id,
3619 dry_run: request.dry_run,
3620 audit_reason: request.audit_reason,
3621 quality_state: quality_state_from_proto(&request.quality_state)?,
3622 historical_state: request
3623 .historical_state
3624 .as_deref()
3625 .map(historical_state_from_proto)
3626 .transpose()?,
3627 })
3628 .await
3629 .map_err(map_store_error)?;
3630 record_trace(
3631 &self.runtime,
3632 TraceOperationKind::Recover,
3633 "grpc",
3634 Some(request.tenant_id),
3635 Some(namespace),
3636 principal,
3637 started_at_unix_ms,
3638 TraceStatus::Ok,
3639 None,
3640 OperationTraceSummary {
3641 record_id: Some(receipt.record_id.clone()),
3642 dry_run: Some(receipt.dry_run),
3643 ..OperationTraceSummary::default()
3644 },
3645 None,
3646 correlation_id,
3647 );
3648
3649 Ok(Response::new(RecoverReply {
3650 record_id: receipt.record_id,
3651 previous_quality_state: quality_state_to_proto(receipt.previous_quality_state),
3652 previous_historical_state: historical_state_to_proto(
3653 receipt.previous_historical_state,
3654 ),
3655 quality_state: quality_state_to_proto(receipt.quality_state),
3656 historical_state: historical_state_to_proto(receipt.historical_state),
3657 changed: receipt.changed,
3658 dry_run: receipt.dry_run,
3659 }))
3660 }
3661 .await;
3662 record_grpc_result(&self.runtime.metrics().grpc_recover, result)
3663 }
3664
3665 async fn stats(
3666 &self,
3667 request: Request<ProtoStoreStatsRequest>,
3668 ) -> Result<Response<ProtoStoreStatsReply>, Status> {
3669 self.runtime.metrics().grpc_stats.record_started();
3670 validate_grpc_auth(
3671 &request,
3672 self.runtime.auth().as_ref(),
3673 AuthPermission::Admin,
3674 )?;
3675 let principal = grpc_principal(&request);
3676 let started_at_unix_ms = now_unix_ms();
3677 let correlation_id = self.runtime.traces().next_id("corr");
3678 let result = async {
3679 let request = request.into_inner();
3680 let tenant_id = request.tenant_id.clone();
3681 let namespace = request.namespace.clone();
3682 let _permit = self
3683 .runtime
3684 .admission()
3685 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3686 .await
3687 .map_err(|error| {
3688 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3689 })?;
3690 let report = self
3691 .store
3692 .stats(StoreStatsRequest {
3693 tenant_id: tenant_id.clone(),
3694 namespace: namespace.clone(),
3695 })
3696 .await
3697 .map_err(map_store_error)?;
3698 record_trace(
3699 &self.runtime,
3700 TraceOperationKind::Stats,
3701 "grpc",
3702 tenant_id,
3703 namespace,
3704 principal,
3705 started_at_unix_ms,
3706 TraceStatus::Ok,
3707 None,
3708 OperationTraceSummary::default(),
3709 None,
3710 correlation_id,
3711 );
3712
3713 Ok(Response::new(ProtoStoreStatsReply {
3714 generated_at_unix_ms: report.generated_at_unix_ms,
3715 total_records: report.total_records,
3716 storage_bytes: report.storage_bytes,
3717 namespaces: report
3718 .namespaces
3719 .into_iter()
3720 .map(namespace_stats_to_proto)
3721 .collect(),
3722 maintenance: Some(maintenance_stats_to_proto(report.maintenance)),
3723 engine: Some(engine_tuning_info_to_proto(report.engine)),
3724 }))
3725 }
3726 .await;
3727 record_grpc_result(&self.runtime.metrics().grpc_stats, result)
3728 }
3729
3730 async fn integrity_check(
3731 &self,
3732 request: Request<ProtoIntegrityCheckRequest>,
3733 ) -> Result<Response<ProtoIntegrityCheckReply>, Status> {
3734 self.runtime.metrics().grpc_integrity.record_started();
3735 validate_grpc_auth(
3736 &request,
3737 self.runtime.auth().as_ref(),
3738 AuthPermission::Admin,
3739 )?;
3740 let principal = grpc_principal(&request);
3741 let started_at_unix_ms = now_unix_ms();
3742 let correlation_id = self.runtime.traces().next_id("corr");
3743 let result = async {
3744 let request = request.into_inner();
3745 let tenant_id = request.tenant_id.clone();
3746 let namespace = request.namespace.clone();
3747 let _permit = self
3748 .runtime
3749 .admission()
3750 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3751 .await
3752 .map_err(|error| {
3753 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3754 })?;
3755 let report = self
3756 .store
3757 .integrity_check(IntegrityCheckRequest {
3758 tenant_id: tenant_id.clone(),
3759 namespace: namespace.clone(),
3760 })
3761 .await
3762 .map_err(map_store_error)?;
3763 record_trace(
3764 &self.runtime,
3765 TraceOperationKind::IntegrityCheck,
3766 "grpc",
3767 tenant_id,
3768 namespace,
3769 principal,
3770 started_at_unix_ms,
3771 TraceStatus::Ok,
3772 None,
3773 OperationTraceSummary::default(),
3774 None,
3775 correlation_id,
3776 );
3777
3778 Ok(Response::new(ProtoIntegrityCheckReply {
3779 generated_at_unix_ms: report.generated_at_unix_ms,
3780 healthy: report.healthy,
3781 scanned_records: report.scanned_records,
3782 scanned_idempotency_keys: report.scanned_idempotency_keys,
3783 stale_idempotency_keys: report.stale_idempotency_keys,
3784 missing_idempotency_keys: report.missing_idempotency_keys,
3785 duplicate_active_records: report.duplicate_active_records,
3786 }))
3787 }
3788 .await;
3789 record_grpc_result(&self.runtime.metrics().grpc_integrity, result)
3790 }
3791
3792 async fn repair(
3793 &self,
3794 request: Request<ProtoRepairRequest>,
3795 ) -> Result<Response<ProtoRepairReply>, Status> {
3796 self.runtime.metrics().grpc_repair.record_started();
3797 validate_grpc_auth(
3798 &request,
3799 self.runtime.auth().as_ref(),
3800 AuthPermission::Admin,
3801 )?;
3802 let principal = grpc_principal(&request);
3803 let started_at_unix_ms = now_unix_ms();
3804 let correlation_id = self.runtime.traces().next_id("corr");
3805 let result = async {
3806 let request = request.into_inner();
3807 let tenant_id = request.tenant_id.clone();
3808 let namespace = request.namespace.clone();
3809 let _permit = self
3810 .runtime
3811 .admission()
3812 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3813 .await
3814 .map_err(|error| {
3815 map_grpc_admission_error(self.runtime.metrics().as_ref(), error)
3816 })?;
3817 let report = self
3818 .store
3819 .repair(RepairRequest {
3820 tenant_id: tenant_id.clone(),
3821 namespace: namespace.clone(),
3822 dry_run: request.dry_run,
3823 reason: request.reason,
3824 remove_stale_idempotency_keys: request.remove_stale_idempotency_keys,
3825 rebuild_missing_idempotency_keys: request.rebuild_missing_idempotency_keys,
3826 })
3827 .await
3828 .map_err(map_store_error)?;
3829 record_trace(
3830 &self.runtime,
3831 TraceOperationKind::Repair,
3832 "grpc",
3833 tenant_id,
3834 namespace,
3835 principal,
3836 started_at_unix_ms,
3837 TraceStatus::Ok,
3838 None,
3839 OperationTraceSummary {
3840 dry_run: Some(request.dry_run),
3841 ..OperationTraceSummary::default()
3842 },
3843 None,
3844 correlation_id,
3845 );
3846
3847 Ok(Response::new(ProtoRepairReply {
3848 dry_run: report.dry_run,
3849 scanned_records: report.scanned_records,
3850 scanned_idempotency_keys: report.scanned_idempotency_keys,
3851 removed_stale_idempotency_keys: report.removed_stale_idempotency_keys,
3852 rebuilt_missing_idempotency_keys: report.rebuilt_missing_idempotency_keys,
3853 healthy_after: report.healthy_after,
3854 }))
3855 }
3856 .await;
3857 record_grpc_result(&self.runtime.metrics().grpc_repair, result)
3858 }
3859
3860 async fn list_traces(
3861 &self,
3862 request: Request<ProtoListTracesRequest>,
3863 ) -> Result<Response<ProtoListTracesReply>, Status> {
3864 validate_grpc_auth(
3865 &request,
3866 self.runtime.auth().as_ref(),
3867 AuthPermission::Admin,
3868 )?;
3869 let request = request.into_inner();
3870 self.runtime
3871 .metrics()
3872 .trace_reads
3873 .fetch_add(1, Ordering::Relaxed);
3874 let traces = self.runtime.traces().list(&TraceListRequest {
3875 tenant_id: request.tenant_id,
3876 namespace: request.namespace,
3877 operation: ProtoTraceOperationKind::try_from(request.operation)
3878 .ok()
3879 .and_then(trace_operation_kind_from_proto),
3880 status: ProtoTraceStatus::try_from(request.status)
3881 .ok()
3882 .and_then(trace_status_from_proto),
3883 before_started_at_unix_ms: request.before_started_at_unix_ms,
3884 limit: request.limit.map(|value| value as usize),
3885 });
3886 Ok(Response::new(ProtoListTracesReply {
3887 traces: traces.into_iter().map(operation_trace_to_proto).collect(),
3888 }))
3889 }
3890
3891 async fn get_trace(
3892 &self,
3893 request: Request<ProtoGetTraceRequest>,
3894 ) -> Result<Response<ProtoOperationTrace>, Status> {
3895 validate_grpc_auth(
3896 &request,
3897 self.runtime.auth().as_ref(),
3898 AuthPermission::Admin,
3899 )?;
3900 let trace_id = request.into_inner().trace_id;
3901 self.runtime
3902 .metrics()
3903 .trace_reads
3904 .fetch_add(1, Ordering::Relaxed);
3905 let trace = self
3906 .runtime
3907 .traces()
3908 .get(&trace_id)
3909 .ok_or_else(|| Status::not_found(format!("trace {trace_id} not found")))?;
3910 Ok(Response::new(operation_trace_to_proto(trace)))
3911 }
3912
3913 async fn export(
3914 &self,
3915 request: Request<ProtoExportRequest>,
3916 ) -> Result<Response<ProtoExportReply>, Status> {
3917 validate_grpc_auth(
3918 &request,
3919 self.runtime.auth().as_ref(),
3920 AuthPermission::Admin,
3921 )?;
3922 let principal = grpc_principal(&request);
3923 let started_at_unix_ms = now_unix_ms();
3924 let correlation_id = self.runtime.traces().next_id("corr");
3925 let request = request.into_inner();
3926 let export_request = ExportRequest {
3927 tenant_id: request.tenant_id,
3928 namespace: request.namespace,
3929 include_archived: request.include_archived,
3930 };
3931 let _permit = self
3932 .runtime
3933 .admission()
3934 .acquire(AdmissionClass::Admin, export_request.tenant_id.as_deref())
3935 .await
3936 .map_err(|error| map_grpc_admission_error(self.runtime.metrics().as_ref(), error))?;
3937 let package = self
3938 .store
3939 .export(export_request.clone())
3940 .await
3941 .map_err(map_store_error)?;
3942 record_trace(
3943 &self.runtime,
3944 TraceOperationKind::Export,
3945 "grpc",
3946 export_request.tenant_id,
3947 export_request.namespace,
3948 principal,
3949 started_at_unix_ms,
3950 TraceStatus::Ok,
3951 None,
3952 OperationTraceSummary::default(),
3953 None,
3954 correlation_id,
3955 );
3956 Ok(Response::new(portable_package_to_proto(package)))
3957 }
3958
3959 async fn import(
3960 &self,
3961 request: Request<ProtoImportRequest>,
3962 ) -> Result<Response<ProtoImportReply>, Status> {
3963 validate_grpc_auth(
3964 &request,
3965 self.runtime.auth().as_ref(),
3966 AuthPermission::Admin,
3967 )?;
3968 let principal = grpc_principal(&request);
3969 let started_at_unix_ms = now_unix_ms();
3970 let correlation_id = self.runtime.traces().next_id("corr");
3971 let request = request.into_inner();
3972 let import_request = ImportRequest {
3973 package: portable_package_from_proto(
3974 request
3975 .package
3976 .ok_or_else(|| invalid_argument("portable package is required"))?,
3977 )?,
3978 mode: import_mode_from_proto(request.mode)?,
3979 dry_run: request.dry_run,
3980 };
3981 let tenant_id = import_request
3982 .package
3983 .records
3984 .first()
3985 .map(|entry| entry.record.scope.tenant_id.clone());
3986 let namespace = import_request
3987 .package
3988 .records
3989 .first()
3990 .map(|entry| entry.record.scope.namespace.clone());
3991 let _permit = self
3992 .runtime
3993 .admission()
3994 .acquire(AdmissionClass::Admin, tenant_id.as_deref())
3995 .await
3996 .map_err(|error| map_grpc_admission_error(self.runtime.metrics().as_ref(), error))?;
3997 let report = self
3998 .store
3999 .import(import_request.clone())
4000 .await
4001 .map_err(map_store_error)?;
4002 record_trace(
4003 &self.runtime,
4004 TraceOperationKind::Import,
4005 "grpc",
4006 tenant_id,
4007 namespace,
4008 principal,
4009 started_at_unix_ms,
4010 if report.failed_records.is_empty() {
4011 TraceStatus::Ok
4012 } else {
4013 TraceStatus::Error
4014 },
4015 (!report.failed_records.is_empty())
4016 .then(|| format!("{} import validation failures", report.failed_records.len())),
4017 OperationTraceSummary {
4018 request_count: Some(import_request.package.records.len() as u32),
4019 dry_run: Some(import_request.dry_run),
4020 ..OperationTraceSummary::default()
4021 },
4022 None,
4023 correlation_id,
4024 );
4025 Ok(Response::new(ProtoImportReply {
4026 mode: request.mode,
4027 dry_run: report.dry_run,
4028 applied: report.applied,
4029 compatible_package: report.compatible_package,
4030 package_version: report.package_version,
4031 validated_records: report.validated_records,
4032 imported_records: report.imported_records,
4033 skipped_records: report.skipped_records,
4034 replaced_existing: report.replaced_existing,
4035 snapshot_id: report.snapshot_id,
4036 failed_records: report
4037 .failed_records
4038 .into_iter()
4039 .map(|failure| mnemara_protocol::v1::ImportFailure {
4040 record_id: failure.record_id,
4041 reason: failure.reason,
4042 })
4043 .collect(),
4044 }))
4045 }
4046}