1pub mod backup;
2pub mod catalog;
3pub mod checkpoint;
4pub mod commit;
5pub mod config;
6pub mod declarative;
7pub mod error;
8mod lib_helpers;
9#[cfg(test)]
10mod lib_tests;
11pub mod manifest;
12pub mod migration;
13pub mod offline;
14pub mod order_book;
15pub mod permission;
16pub mod preflight;
17pub mod query;
18pub mod recovery;
19pub mod repository;
20pub mod snapshot;
21pub mod storage;
22pub mod sync_bridge;
23pub mod version_store;
24pub mod wal;
25
26use crate::backup::{
27 BackupManifest, extract_backup_archive, load_backup_manifest, resolve_backup_path,
28 sha256_file_hex, verify_backup_files, write_backup_archive, write_backup_manifest,
29};
30use crate::catalog::namespace_key;
31use crate::catalog::schema::{AsyncIndexDef, IndexDef, TableSchema};
32use crate::catalog::types::{Row, Value};
33use crate::catalog::{DdlOperation, ResourceType};
34use crate::checkpoint::loader::load_checkpoint_with_key;
35use crate::checkpoint::writer::write_checkpoint_with_key;
36use crate::commit::executor::{CommitExecutor, CommitResult, ExecutorMetrics};
37use crate::commit::tx::{
38 ReadKey, ReadSet, ReadSetEntry, TransactionEnvelope, WriteClass, WriteIntent,
39};
40use crate::commit::validation::{
41 Mutation, TableUpdateExpr, validate_mutation_with_config, validate_permissions,
42};
43use crate::config::{AedbConfig, DurabilityMode, RecoveryMode};
44use crate::error::AedbError;
45use crate::error::ResourceType as ErrorResourceType;
46use crate::lib_helpers::*;
47use crate::manifest::atomic::write_manifest_atomic_signed;
48use crate::manifest::schema::{Manifest, SegmentMeta};
49use crate::migration::{
50 Migration, MigrationRecord, checksum_hex, decode_record, encode_record, migration_key,
51};
52use crate::order_book::{
53 ExecInstruction, FillSpec, InstrumentConfig, OrderBookDepth, OrderBookTableMode, OrderRecord,
54 OrderRequest, OrderSide, OrderType, Spread, TimeInForce, key_client_id, key_order,
55 read_last_execution_report, read_open_orders, read_order_status, read_recent_trades,
56 read_spread, read_top_n, scoped_instrument, u256_from_be,
57};
58use crate::permission::{CallerContext, Permission};
59use crate::preflight::{PreflightResult, preflight, preflight_plan};
60use crate::query::error::QueryError;
61use crate::query::executor::{QueryResult, execute_query_with_options};
62use crate::query::plan::{ConsistencyMode, Expr, Order, Query, QueryOptions};
63use crate::query::planner::{ExecutionStage, build_physical_plan};
64use crate::query::{KvCursor, KvScanResult, ScopedKvEntry};
65use crate::recovery::replay::replay_segments;
66use crate::recovery::{recover_at_seq_with_config, recover_with_config};
67use crate::snapshot::gc::{SnapshotHandle, SnapshotManager};
68use crate::snapshot::reader::SnapshotReadView;
69use crate::storage::encoded_key::EncodedKey;
70use crate::storage::keyspace::{Keyspace, KvEntry, NamespaceId};
71use crate::wal::frame::{FrameError, FrameReader};
72use crate::wal::segment::{SEGMENT_HEADER_SIZE, SegmentHeader};
73use parking_lot::Mutex;
74use serde::{Deserialize, Serialize};
75use std::collections::{BTreeSet, HashMap, VecDeque};
76use std::fs;
77use std::fs::File;
78use std::io::{BufReader, Read};
79use std::ops::Bound;
80use std::path::{Path, PathBuf};
81use std::sync::Arc;
82use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
83use std::time::{Duration, Instant};
84use tokio::sync::Mutex as AsyncMutex;
85use tracing::{info, warn};
86
87const TRUST_MODE_MARKER_FILE: &str = "trust_mode.json";
88
89#[derive(Debug, Clone, Serialize, Deserialize, Default)]
90struct TrustModeMarker {
91 #[serde(default)]
92 ever_non_strict_recovery: bool,
93 #[serde(default)]
94 ever_hash_chain_disabled: bool,
95}
96
97fn trust_mode_marker_path(dir: &Path) -> PathBuf {
98 dir.join(TRUST_MODE_MARKER_FILE)
99}
100
101fn load_trust_mode_marker(dir: &Path) -> Result<Option<TrustModeMarker>, AedbError> {
102 let path = trust_mode_marker_path(dir);
103 if !path.exists() {
104 return Ok(None);
105 }
106 let bytes = fs::read(&path)?;
107 let marker: TrustModeMarker =
108 serde_json::from_slice(&bytes).map_err(|e| AedbError::Validation(e.to_string()))?;
109 Ok(Some(marker))
110}
111
112fn persist_trust_mode_marker(dir: &Path, marker: &TrustModeMarker) -> Result<(), AedbError> {
113 let bytes = serde_json::to_vec(marker).map_err(|e| AedbError::Encode(e.to_string()))?;
114 fs::write(trust_mode_marker_path(dir), bytes)?;
115 Ok(())
116}
117
118fn enforce_and_record_trust_mode(dir: &Path, config: &AedbConfig) -> Result<(), AedbError> {
119 let mut marker = load_trust_mode_marker(dir)?.unwrap_or_default();
120 if config.strict_recovery()
121 && (marker.ever_non_strict_recovery || marker.ever_hash_chain_disabled)
122 {
123 return Err(AedbError::Validation(
124 "strict open denied: data directory was previously opened with non-strict recovery or hash-chain disabled"
125 .into(),
126 ));
127 }
128
129 let mut changed = false;
130 if !config.strict_recovery() && !marker.ever_non_strict_recovery {
131 marker.ever_non_strict_recovery = true;
132 changed = true;
133 }
134 if !config.hash_chain_required && !marker.ever_hash_chain_disabled {
135 marker.ever_hash_chain_disabled = true;
136 changed = true;
137 }
138 if changed {
139 persist_trust_mode_marker(dir, &marker)?;
140 }
141 Ok(())
142}
143
144fn create_private_dir_all(path: &Path) -> Result<(), AedbError> {
147 #[cfg(unix)]
148 {
149 use std::fs::DirBuilder;
150 use std::os::unix::fs::DirBuilderExt;
151 use std::os::unix::fs::PermissionsExt;
152
153 DirBuilder::new()
154 .recursive(true)
155 .mode(0o700) .create(path)?;
157 let metadata = fs::metadata(path)?;
158 if !metadata.is_dir() {
159 return Err(AedbError::Validation(format!(
160 "path is not a directory: {}",
161 path.display()
162 )));
163 }
164 let mut perms = metadata.permissions();
165 if perms.mode() != 0o700 {
166 perms.set_mode(0o700);
167 fs::set_permissions(path, perms)?;
168 }
169 }
170 #[cfg(not(unix))]
171 {
172 fs::create_dir_all(path)?;
173 }
174 Ok(())
175}
176
177pub struct AedbInstance {
178 _config: AedbConfig,
179 require_authenticated_calls: bool,
180 dir: PathBuf,
181 executor: CommitExecutor,
182 checkpoint_lock: Arc<AsyncMutex<()>>,
184 snapshot_manager: Arc<Mutex<SnapshotManager>>,
185 recovery_cache: Arc<Mutex<RecoveryCache>>,
186 lifecycle_hooks: Arc<Mutex<Vec<Arc<dyn LifecycleHook>>>>,
187 telemetry_hooks: Arc<Mutex<Vec<Arc<dyn QueryCommitTelemetryHook>>>>,
188 upstream_validation_rejections: Arc<AtomicU64>,
189 durable_wait_ops: Arc<AtomicU64>,
190 durable_wait_micros: Arc<AtomicU64>,
191 durable_ack_fsync_leader: Arc<AtomicBool>,
192 startup_recovery_micros: u64,
193 startup_recovered_seq: u64,
194}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum CommitFinality {
198 Visible,
200 Durable,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq)]
205pub struct OperationalMetrics {
206 pub commits_total: u64,
207 pub commit_errors: u64,
208 pub permission_rejections: u64,
209 pub validation_rejections: u64,
210 pub queue_full_rejections: u64,
211 pub timeout_rejections: u64,
212 pub conflict_rejections: u64,
213 pub read_set_conflicts: u64,
214 pub conflict_rate: f64,
215 pub avg_commit_latency_micros: u64,
216 pub coordinator_apply_attempts: u64,
217 pub avg_coordinator_apply_micros: u64,
218 pub wal_append_ops: u64,
219 pub wal_append_bytes: u64,
220 pub avg_wal_append_micros: u64,
221 pub wal_sync_ops: u64,
222 pub avg_wal_sync_micros: u64,
223 pub prestage_validate_ops: u64,
224 pub avg_prestage_validate_micros: u64,
225 pub epoch_process_ops: u64,
226 pub avg_epoch_process_micros: u64,
227 pub durable_wait_ops: u64,
228 pub avg_durable_wait_micros: u64,
229 pub inflight_commits: usize,
230 pub queue_depth: usize,
231 pub durable_head_lag: u64,
232 pub visible_head_seq: u64,
233 pub durable_head_seq: u64,
234 pub current_seq: u64,
235 pub snapshot_age_micros: u64,
236 pub startup_recovery_micros: u64,
237 pub startup_recovered_seq: u64,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct ProjectInfo {
242 pub project_id: String,
243 pub scope_count: u32,
244 pub created_at_micros: u64,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub struct ScopeInfo {
249 pub scope_id: String,
250 pub table_count: u32,
251 pub kv_key_count: u64,
252 pub created_at_micros: u64,
253}
254
255#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct TableInfo {
257 pub table_name: String,
258 pub column_count: u32,
259 pub index_count: u32,
260 pub row_count: u64,
261}
262
263#[derive(Debug, Clone, PartialEq, Eq)]
264pub struct DdlResult {
265 pub applied: bool,
266 pub seq: u64,
267}
268
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct DdlBatchResult {
271 pub seq: u64,
272 pub results: Vec<DdlResult>,
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum DdlBatchOrderMode {
277 AsProvided,
278 DependencyAware,
279}
280
281#[derive(Debug, Clone)]
282pub struct MutateWhereReturningResult {
283 pub commit: CommitResult,
284 pub primary_key: Vec<Value>,
285 pub before: Row,
286 pub after: Row,
287}
288
289#[derive(Debug, Clone, PartialEq, Eq)]
290pub struct MigrationReport {
291 pub applied: Vec<(u64, String, Duration)>,
292 pub skipped: Vec<u64>,
293 pub current_version: u64,
294}
295
296#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
297pub enum LifecycleEvent {
298 ProjectCreated {
299 project_id: String,
300 seq: u64,
301 },
302 ProjectDropped {
303 project_id: String,
304 seq: u64,
305 },
306 ScopeCreated {
307 project_id: String,
308 scope_id: String,
309 seq: u64,
310 },
311 ScopeDropped {
312 project_id: String,
313 scope_id: String,
314 seq: u64,
315 },
316 TableCreated {
317 project_id: String,
318 scope_id: String,
319 table_name: String,
320 seq: u64,
321 },
322 TableDropped {
323 project_id: String,
324 scope_id: String,
325 table_name: String,
326 seq: u64,
327 },
328 TableAltered {
329 project_id: String,
330 scope_id: String,
331 table_name: String,
332 seq: u64,
333 },
334}
335
336pub trait LifecycleHook: Send + Sync {
337 fn on_event(&self, event: &LifecycleEvent);
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct ListPageResult {
342 pub rows: Vec<Row>,
343 pub total_count: usize,
344 pub next_cursor: Option<String>,
345 pub snapshot_seq: u64,
346 pub rows_examined: usize,
347}
348
349#[derive(Debug, Clone)]
350pub struct ListWithTotalRequest {
351 pub project_id: String,
352 pub scope_id: String,
353 pub query: Query,
354 pub cursor: Option<String>,
355 pub offset: Option<usize>,
356 pub page_size: usize,
357 pub consistency: ConsistencyMode,
358}
359
360#[derive(Debug, Clone)]
361pub struct LookupThenHydrateRequest {
362 pub project_id: String,
363 pub scope_id: String,
364 pub source_query: Query,
365 pub source_key_index: usize,
366 pub hydrate_query: Query,
367 pub hydrate_key_column: String,
368 pub consistency: ConsistencyMode,
369}
370
371#[derive(Debug, Clone)]
372pub struct UpdateWhereRequest {
373 pub caller: CallerContext,
374 pub project_id: String,
375 pub scope_id: String,
376 pub table_name: String,
377 pub predicate: Expr,
378 pub updates: Vec<(String, Value)>,
379 pub limit: Option<usize>,
380}
381
382#[derive(Debug, Clone)]
383pub struct UpdateWhereExprRequest {
384 pub caller: CallerContext,
385 pub project_id: String,
386 pub scope_id: String,
387 pub table_name: String,
388 pub predicate: Expr,
389 pub updates: Vec<(String, TableUpdateExpr)>,
390 pub limit: Option<usize>,
391}
392
393#[derive(Debug, Clone)]
394pub struct TableU256MutationRequest {
395 pub caller: CallerContext,
396 pub project_id: String,
397 pub scope_id: String,
398 pub table_name: String,
399 pub primary_key: Vec<Value>,
400 pub column: String,
401 pub amount_be: [u8; 32],
402}
403
404#[derive(Debug, Clone)]
405pub struct CompareAndSwapRequest {
406 pub caller: CallerContext,
407 pub project_id: String,
408 pub scope_id: String,
409 pub table_name: String,
410 pub primary_key: Vec<Value>,
411 pub row: Row,
412 pub expected_seq: u64,
413}
414
415#[derive(Debug, Clone)]
416pub struct QueryBatchItem {
417 pub query: Query,
418 pub options: QueryOptions,
419}
420
421#[derive(Debug, Clone, PartialEq, Eq)]
422pub struct QueryDiagnostics {
423 pub snapshot_seq: u64,
424 pub estimated_scan_rows: u64,
425 pub max_scan_rows: u64,
426 pub index_used: Option<String>,
427 pub selected_indexes: Vec<String>,
428 pub predicate_evaluation_path: PredicateEvaluationPath,
429 pub plan_trace: Vec<String>,
430 pub stages: Vec<ExecutionStage>,
431 pub bounded_by_limit_or_cursor: bool,
432 pub has_joins: bool,
433}
434
435#[derive(Debug, Clone, PartialEq, Eq)]
436pub enum PredicateEvaluationPath {
437 None,
438 PrimaryKeyEqLookup,
439 SecondaryIndexLookup,
440 AsyncIndexProjection,
441 FullScanFilter,
442 JoinExecution,
443}
444
445#[derive(Debug, Clone)]
446pub struct QueryWithDiagnosticsResult {
447 pub result: QueryResult,
448 pub diagnostics: QueryDiagnostics,
449}
450
451#[derive(Debug, Clone)]
452pub struct SqlTransactionPlan {
453 pub base_seq: u64,
454 pub caller: Option<CallerContext>,
455 pub mutations: Vec<Mutation>,
456}
457
458pub trait QueryCommitTelemetryHook: Send + Sync {
459 fn on_query(&self, _event: &QueryTelemetryEvent) {}
460 fn on_commit(&self, _event: &CommitTelemetryEvent) {}
461}
462
463#[derive(Debug, Clone, PartialEq, Eq)]
464pub struct QueryTelemetryEvent {
465 pub project_id: String,
466 pub scope_id: String,
467 pub table: String,
468 pub snapshot_seq: u64,
469 pub rows_examined: usize,
470 pub latency_micros: u64,
471 pub ok: bool,
472 pub error: Option<String>,
473}
474
475#[derive(Debug, Clone, PartialEq, Eq)]
476pub struct CommitTelemetryEvent {
477 pub op: &'static str,
478 pub commit_seq: Option<u64>,
479 pub durable_head_seq: Option<u64>,
480 pub latency_micros: u64,
481 pub ok: bool,
482 pub error: Option<String>,
483}
484
485pub trait ReadOnlySqlAdapter: Send + Sync {
486 fn execute_read_only(
487 &self,
488 project_id: &str,
489 scope_id: &str,
490 sql: &str,
491 ) -> Result<(Query, QueryOptions), QueryError>;
492}
493
494pub trait RemoteBackupAdapter: Send + Sync {
495 fn store_backup_dir(&self, uri: &str, backup_dir: &Path) -> Result<(), AedbError>;
496 fn materialize_backup_chain(
497 &self,
498 uri: &str,
499 scratch_dir: &Path,
500 ) -> Result<Vec<PathBuf>, AedbError>;
501}
502
503struct SnapshotLease {
504 manager: Arc<Mutex<SnapshotManager>>,
505 handle: SnapshotHandle,
506 view: SnapshotReadView,
507}
508
509pub struct ReadTx<'a> {
510 db: &'a AedbInstance,
511 lease: SnapshotLease,
512 caller: Option<CallerContext>,
513}
514
515const RECOVERY_CACHE_CAPACITY: usize = 16;
516const RECOVERY_CACHE_TTL: Duration = Duration::from_secs(60);
517const SYSTEM_CALLER_ID: &str = "system";
518const MAX_MUTATE_WHERE_RETURNING_RETRIES: usize = 16;
519
520#[derive(Debug, Default)]
521struct RecoveryCache {
522 order: VecDeque<(u64, u64)>,
523 entries: HashMap<u64, RecoveryCacheEntry>,
524 next_generation: u64,
525}
526
527#[derive(Debug)]
528struct RecoveryCacheEntry {
529 view: SnapshotReadView,
530 created: Instant,
531 generation: u64,
532}
533
534impl RecoveryCache {
535 fn get(&mut self, seq: u64) -> Option<SnapshotReadView> {
536 self.prune_expired();
537 let generation = self.bump_generation();
538 let view = {
539 let entry = self.entries.get_mut(&seq)?;
540 entry.created = Instant::now();
541 entry.generation = generation;
542 entry.view.clone()
543 };
544 self.order.push_back((seq, generation));
545 self.compact_order_if_needed();
546 Some(view)
547 }
548
549 fn put(&mut self, seq: u64, view: SnapshotReadView) {
550 self.prune_expired();
551 let generation = self.bump_generation();
552 self.entries.insert(
553 seq,
554 RecoveryCacheEntry {
555 view,
556 created: Instant::now(),
557 generation,
558 },
559 );
560 self.order.push_back((seq, generation));
561 self.evict_to_capacity();
562 self.compact_order_if_needed();
563 }
564
565 fn prune_expired(&mut self) {
566 let now = Instant::now();
567 let expired: Vec<u64> = self
568 .entries
569 .iter()
570 .filter_map(|(seq, entry)| {
571 if now.duration_since(entry.created) > RECOVERY_CACHE_TTL {
572 Some(*seq)
573 } else {
574 None
575 }
576 })
577 .collect();
578 for seq in expired {
579 self.entries.remove(&seq);
580 }
581 self.compact_order_if_needed();
582 }
583
584 fn bump_generation(&mut self) -> u64 {
585 let generation = self.next_generation;
586 self.next_generation = self.next_generation.wrapping_add(1);
587 generation
588 }
589
590 fn evict_to_capacity(&mut self) {
591 while self.entries.len() > RECOVERY_CACHE_CAPACITY {
592 let Some((seq, generation)) = self.order.pop_front() else {
593 break;
594 };
595 let should_remove = self
596 .entries
597 .get(&seq)
598 .map(|entry| entry.generation == generation)
599 .unwrap_or(false);
600 if should_remove {
601 self.entries.remove(&seq);
602 }
603 }
604 }
605
606 fn compact_order_if_needed(&mut self) {
607 let max_order_len = RECOVERY_CACHE_CAPACITY.saturating_mul(8);
608 if self.order.len() <= max_order_len {
609 return;
610 }
611 let mut live: Vec<(u64, u64)> = self
612 .entries
613 .iter()
614 .map(|(seq, entry)| (*seq, entry.generation))
615 .collect();
616 live.sort_by_key(|(_, generation)| *generation);
617 self.order = live.into_iter().collect();
618 }
619}
620
621impl Drop for SnapshotLease {
622 fn drop(&mut self) {
623 let mut mgr = self.manager.lock();
624 mgr.release(self.handle);
625 let _ = mgr.gc();
626 }
627}
628
629impl AedbInstance {
630 pub fn open_production(config: AedbConfig, dir: &Path) -> Result<Self, AedbError> {
631 validate_arcana_config(&config)?;
632 Self::open_internal(config, dir, true)
633 }
634
635 pub fn open_secure(config: AedbConfig, dir: &Path) -> Result<Self, AedbError> {
636 validate_secure_config(&config)?;
637 Self::open_internal(config, dir, true)
638 }
639
640 pub fn open(config: AedbConfig, dir: &Path) -> Result<Self, AedbError> {
641 Self::open_internal(config, dir, false)
642 }
643
644 fn open_internal(
645 config: AedbConfig,
646 dir: &Path,
647 require_authenticated_calls: bool,
648 ) -> Result<Self, AedbError> {
649 validate_config(&config)?;
650 info!(
651 max_segment_bytes = config.max_segment_bytes,
652 max_segment_age_secs = config.max_segment_age_secs,
653 durability_mode = ?config.durability_mode,
654 batch_interval_ms = config.batch_interval_ms,
655 batch_max_bytes = config.batch_max_bytes,
656 idempotency_window_seconds = config.idempotency_window_seconds,
657 idempotency_window_commits = config.idempotency_window_commits,
658 max_inflight_commits = config.max_inflight_commits,
659 max_commit_queue_bytes = config.max_commit_queue_bytes,
660 max_transaction_bytes = config.max_transaction_bytes,
661 commit_timeout_ms = config.commit_timeout_ms,
662 durable_ack_coalescing_enabled = config.durable_ack_coalescing_enabled,
663 durable_ack_coalesce_window_us = config.durable_ack_coalesce_window_us,
664 max_snapshot_age_ms = config.max_snapshot_age_ms,
665 max_concurrent_snapshots = config.max_concurrent_snapshots,
666 max_scan_rows = config.max_scan_rows,
667 max_kv_key_bytes = config.max_kv_key_bytes,
668 max_kv_value_bytes = config.max_kv_value_bytes,
669 max_memory_estimate_bytes = config.max_memory_estimate_bytes,
670 epoch_max_wait_us = config.epoch_max_wait_us,
671 epoch_min_commits = config.epoch_min_commits,
672 epoch_max_commits = config.epoch_max_commits,
673 adaptive_epoch_enabled = config.adaptive_epoch_enabled,
674 adaptive_epoch_min_commits_floor = config.adaptive_epoch_min_commits_floor,
675 adaptive_epoch_min_commits_ceiling = config.adaptive_epoch_min_commits_ceiling,
676 adaptive_epoch_wait_us_floor = config.adaptive_epoch_wait_us_floor,
677 adaptive_epoch_wait_us_ceiling = config.adaptive_epoch_wait_us_ceiling,
678 adaptive_epoch_target_latency_us = config.adaptive_epoch_target_latency_us,
679 parallel_apply_enabled = config.parallel_apply_enabled,
680 parallel_worker_threads = config.parallel_worker_threads,
681 coordinator_locking_enabled = config.coordinator_locking_enabled,
682 global_unique_index_enabled = config.global_unique_index_enabled,
683 partition_lock_timeout_ms = config.partition_lock_timeout_ms,
684 epoch_apply_timeout_ms = config.epoch_apply_timeout_ms,
685 checkpoint_encryption_enabled = config.checkpoint_encryption_key.is_some(),
686 checkpoint_key_id = config.checkpoint_key_id.as_deref().unwrap_or(""),
687 checkpoint_compression_level = config.checkpoint_compression_level,
688 manifest_hmac_enabled = config.manifest_hmac_key.is_some(),
689 recovery_mode = ?config.recovery_mode,
690 hash_chain_required = config.hash_chain_required,
691 primary_index_backend = ?config.primary_index_backend,
692 "aedb config"
693 );
694 create_private_dir_all(dir)?;
695 enforce_and_record_trust_mode(dir, &config)?;
696 let has_existing = fs::read_dir(dir)?
697 .filter_map(|e| e.ok())
698 .any(|e| e.file_name().to_string_lossy().contains(".aedb"));
699
700 let recovery_start = std::time::SystemTime::now();
701 let (executor, startup_recovered_seq) = if has_existing {
702 let mut recovered = recover_with_config(dir, &config)?;
703 recovered.keyspace.set_backend(config.primary_index_backend);
704 if require_authenticated_calls {
705 seed_system_global_admin(&mut recovered.catalog);
706 }
707 (
708 CommitExecutor::with_state(
709 dir,
710 recovered.keyspace,
711 recovered.catalog,
712 recovered.current_seq,
713 recovered.current_seq + 1,
714 config.clone(),
715 recovered.idempotency,
716 )?,
717 recovered.current_seq,
718 )
719 } else {
720 let mut catalog = crate::catalog::Catalog::default();
721 if require_authenticated_calls {
722 seed_system_global_admin(&mut catalog);
723 }
724 (
725 CommitExecutor::with_state(
726 dir,
727 crate::storage::keyspace::Keyspace::with_backend(config.primary_index_backend),
728 catalog,
729 0,
730 1,
731 config.clone(),
732 std::collections::HashMap::new(),
733 )?,
734 0,
735 )
736 };
737 let startup_recovery_micros =
738 recovery_start.elapsed().unwrap_or_default().as_micros() as u64;
739
740 Ok(Self {
741 _config: config,
742 require_authenticated_calls,
743 dir: dir.to_path_buf(),
744 executor,
745 checkpoint_lock: Arc::new(AsyncMutex::new(())),
746 snapshot_manager: Arc::new(Mutex::new(SnapshotManager::default())),
747 recovery_cache: Arc::new(Mutex::new(RecoveryCache::default())),
748 lifecycle_hooks: Arc::new(Mutex::new(Vec::new())),
749 telemetry_hooks: Arc::new(Mutex::new(Vec::new())),
750 upstream_validation_rejections: Arc::new(AtomicU64::new(0)),
751 durable_wait_ops: Arc::new(AtomicU64::new(0)),
752 durable_wait_micros: Arc::new(AtomicU64::new(0)),
753 durable_ack_fsync_leader: Arc::new(AtomicBool::new(false)),
754 startup_recovery_micros,
755 startup_recovered_seq,
756 })
757 }
758
759 pub async fn commit(&self, mutation: Mutation) -> Result<CommitResult, AedbError> {
760 let started = Instant::now();
761 if self.require_authenticated_calls {
762 return Err(AedbError::PermissionDenied(
763 "authenticated caller required; use commit_as in secure mode".into(),
764 ));
765 }
766 crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
768
769 let result = self.executor.submit(mutation).await;
770 self.emit_commit_telemetry("commit", started, &result);
771 let result = result?;
772 self.dispatch_lifecycle_events_for_commit(result.commit_seq)
773 .await;
774 Ok(result)
775 }
776
777 async fn commit_prevalidated_internal(
778 &self,
779 op_name: &'static str,
780 mutation: Mutation,
781 ) -> Result<CommitResult, AedbError> {
782 let started = Instant::now();
783 if self.require_authenticated_calls {
784 return Err(AedbError::PermissionDenied(
785 "authenticated caller required; use commit_as in secure mode".into(),
786 ));
787 }
788 crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
789 let result = self.executor.submit_prevalidated(mutation).await;
790 self.emit_commit_telemetry(op_name, started, &result);
791 result
792 }
793
794 async fn commit_prevalidated_internal_with_finality(
795 &self,
796 op_name: &'static str,
797 mutation: Mutation,
798 finality: CommitFinality,
799 ) -> Result<CommitResult, AedbError> {
800 let mut result = self.commit_prevalidated_internal(op_name, mutation).await?;
801 self.enforce_finality(&mut result, finality).await?;
802 Ok(result)
803 }
804
805 pub async fn commit_with_finality(
806 &self,
807 mutation: Mutation,
808 finality: CommitFinality,
809 ) -> Result<CommitResult, AedbError> {
810 let mut result = self.commit(mutation).await?;
811 self.enforce_finality(&mut result, finality).await?;
812 Ok(result)
813 }
814
815 pub async fn commit_with_preflight(
816 &self,
817 mutation: Mutation,
818 ) -> Result<CommitResult, AedbError> {
819 let started = Instant::now();
820 if self.require_authenticated_calls {
821 return Err(AedbError::PermissionDenied(
822 "authenticated caller required; use commit_as_with_preflight in secure mode".into(),
823 ));
824 }
825 crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
826 let plan = self.preflight_plan(mutation).await;
827 let result = commit_from_preflight_plan(self, None, plan).await;
828 self.emit_commit_telemetry("commit_with_preflight", started, &result);
829 result
830 }
831
832 pub async fn upsert_batch(
833 &self,
834 project_id: &str,
835 scope_id: &str,
836 table_name: &str,
837 rows: Vec<crate::catalog::types::Row>,
838 ) -> Result<CommitResult, AedbError> {
839 self.commit(Mutation::UpsertBatch {
840 project_id: project_id.to_string(),
841 scope_id: scope_id.to_string(),
842 table_name: table_name.to_string(),
843 rows,
844 })
845 .await
846 }
847
848 pub async fn insert(
849 &self,
850 project_id: &str,
851 scope_id: &str,
852 table_name: &str,
853 primary_key: Vec<crate::catalog::types::Value>,
854 row: crate::catalog::types::Row,
855 ) -> Result<CommitResult, AedbError> {
856 self.commit(Mutation::Insert {
857 project_id: project_id.to_string(),
858 scope_id: scope_id.to_string(),
859 table_name: table_name.to_string(),
860 primary_key,
861 row,
862 })
863 .await
864 }
865
866 pub async fn insert_batch(
867 &self,
868 project_id: &str,
869 scope_id: &str,
870 table_name: &str,
871 rows: Vec<crate::catalog::types::Row>,
872 ) -> Result<CommitResult, AedbError> {
873 self.commit(Mutation::InsertBatch {
874 project_id: project_id.to_string(),
875 scope_id: scope_id.to_string(),
876 table_name: table_name.to_string(),
877 rows,
878 })
879 .await
880 }
881
882 pub async fn commit_as(
883 &self,
884 caller: CallerContext,
885 mutation: Mutation,
886 ) -> Result<CommitResult, AedbError> {
887 let started = Instant::now();
888 ensure_external_caller_allowed(&caller)?;
889 crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
891
892 let result = self.executor.submit_as(Some(caller), mutation).await;
893 self.emit_commit_telemetry("commit_as", started, &result);
894 let result = result?;
895 self.dispatch_lifecycle_events_for_commit(result.commit_seq)
896 .await;
897 Ok(result)
898 }
899
900 pub async fn commit_as_with_finality(
901 &self,
902 caller: CallerContext,
903 mutation: Mutation,
904 finality: CommitFinality,
905 ) -> Result<CommitResult, AedbError> {
906 let mut result = self.commit_as(caller, mutation).await?;
907 self.enforce_finality(&mut result, finality).await?;
908 Ok(result)
909 }
910
911 pub async fn commit_as_with_preflight(
912 &self,
913 caller: CallerContext,
914 mutation: Mutation,
915 ) -> Result<CommitResult, AedbError> {
916 let started = Instant::now();
917 ensure_external_caller_allowed(&caller)?;
918 crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
919 let plan = self.preflight_plan_as(&caller, mutation).await?;
920 let result = commit_from_preflight_plan(self, Some(caller), plan).await;
921 self.emit_commit_telemetry("commit_as_with_preflight", started, &result);
922 result
923 }
924
925 pub async fn commit_envelope(
926 &self,
927 envelope: TransactionEnvelope,
928 ) -> Result<CommitResult, AedbError> {
929 let started = Instant::now();
930 if self.require_authenticated_calls && envelope.caller.is_none() {
931 return Err(AedbError::PermissionDenied(
932 "authenticated caller required; provide envelope.caller in secure mode".into(),
933 ));
934 }
935 if let Some(caller) = envelope.caller.as_ref() {
936 if caller.is_internal_system() {
937 return Err(AedbError::PermissionDenied(
938 "internal system caller requires trusted commit path".into(),
939 ));
940 }
941 ensure_external_caller_allowed(caller)?;
942 }
943 let result = self.executor.submit_envelope(envelope).await;
944 self.emit_commit_telemetry("commit_envelope", started, &result);
945 let result = result?;
946 self.dispatch_lifecycle_events_for_commit(result.commit_seq)
947 .await;
948 Ok(result)
949 }
950
951 pub async fn commit_envelope_with_finality(
952 &self,
953 envelope: TransactionEnvelope,
954 finality: CommitFinality,
955 ) -> Result<CommitResult, AedbError> {
956 let mut result = self.commit_envelope(envelope).await?;
957 self.enforce_finality(&mut result, finality).await?;
958 Ok(result)
959 }
960
961 async fn enforce_finality(
962 &self,
963 result: &mut CommitResult,
964 finality: CommitFinality,
965 ) -> Result<(), AedbError> {
966 if matches!(finality, CommitFinality::Durable)
967 && result.durable_head_seq < result.commit_seq
968 {
969 let wait_started = Instant::now();
970 if self._config.durable_ack_coalescing_enabled
971 && matches!(self._config.durability_mode, DurabilityMode::Batch)
972 {
973 let window_us = self._config.durable_ack_coalesce_window_us;
974 if window_us > 0 {
975 tokio::time::sleep(Duration::from_micros(window_us)).await;
976 }
977 if self.executor.durable_head_seq_now() < result.commit_seq {
978 let grace_wait_us = window_us.max(
981 self._config
982 .batch_interval_ms
983 .saturating_mul(1000)
984 .saturating_mul(2),
985 );
986 if grace_wait_us > 0 {
987 let _ = tokio::time::timeout(
988 Duration::from_micros(grace_wait_us),
989 self.wait_for_durable(result.commit_seq),
990 )
991 .await;
992 }
993
994 if self.executor.durable_head_seq_now() < result.commit_seq {
995 let recently_synced = grace_wait_us > 0
996 && self
997 .executor
998 .last_wal_sync_age_us()
999 .is_some_and(|age| age < grace_wait_us);
1000 if recently_synced {
1001 let _ = tokio::time::timeout(
1002 Duration::from_micros(grace_wait_us),
1003 self.wait_for_durable(result.commit_seq),
1004 )
1005 .await;
1006 }
1007 }
1008
1009 if self.executor.durable_head_seq_now() < result.commit_seq {
1010 if self
1011 .durable_ack_fsync_leader
1012 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1013 .is_ok()
1014 {
1015 struct LeaderGuard<'a>(&'a AtomicBool);
1016 impl Drop for LeaderGuard<'_> {
1017 fn drop(&mut self) {
1018 self.0.store(false, Ordering::Release);
1019 }
1020 }
1021 let _guard = LeaderGuard(&self.durable_ack_fsync_leader);
1022 let _ = self.force_fsync().await?;
1023 } else {
1024 self.wait_for_durable(result.commit_seq).await?;
1025 }
1026 }
1027 }
1028 } else {
1029 self.wait_for_durable(result.commit_seq).await?;
1030 }
1031 self.durable_wait_ops.fetch_add(1, Ordering::Relaxed);
1032 self.durable_wait_micros
1033 .fetch_add(wait_started.elapsed().as_micros() as u64, Ordering::Relaxed);
1034 result.durable_head_seq = self.executor.durable_head_seq_now();
1035 }
1036 Ok(())
1037 }
1038
1039 async fn dispatch_lifecycle_events_for_commit(&self, commit_seq: u64) {
1040 if self.lifecycle_hooks.lock().is_empty() {
1041 return;
1042 }
1043 let events = match self.read_lifecycle_events_for_commit(commit_seq).await {
1044 Ok(events) => events,
1045 Err(err) => {
1046 warn!(commit_seq, error = ?err, "failed to read lifecycle outbox");
1047 return;
1048 }
1049 };
1050 self.dispatch_lifecycle_events(events);
1051 }
1052
1053 async fn read_lifecycle_events_for_commit(
1054 &self,
1055 commit_seq: u64,
1056 ) -> Result<Vec<LifecycleEvent>, AedbError> {
1057 let ns = namespace_key(crate::catalog::SYSTEM_PROJECT_ID, "app");
1058 let table_key = (ns.clone(), "lifecycle_outbox".to_string());
1059 let (snapshot, catalog, _) = self.executor.snapshot_state().await;
1060 let Some(schema) = catalog.tables.get(&table_key) else {
1061 return Ok(Vec::new());
1062 };
1063 let Some(events_idx) = schema.columns.iter().position(|c| c.name == "events") else {
1064 return Ok(Vec::new());
1065 };
1066 let Some(table) =
1067 snapshot.table(crate::catalog::SYSTEM_PROJECT_ID, "app", "lifecycle_outbox")
1068 else {
1069 return Ok(Vec::new());
1070 };
1071 let encoded_pk = EncodedKey::from_values(&[Value::Integer(commit_seq as i64)]);
1072 let Some(row) = table.rows.get(&encoded_pk) else {
1073 return Ok(Vec::new());
1074 };
1075 let Some(Value::Json(events_json)) = row.values.get(events_idx) else {
1076 return Ok(Vec::new());
1077 };
1078 serde_json::from_str(events_json.as_str()).map_err(|e| AedbError::Decode(e.to_string()))
1079 }
1080
1081 fn dispatch_lifecycle_events(&self, events: Vec<LifecycleEvent>) {
1082 if events.is_empty() {
1083 return;
1084 }
1085 let hooks = self.lifecycle_hooks.lock().clone();
1086 if hooks.is_empty() {
1087 return;
1088 }
1089 tokio::spawn(async move {
1090 for event in events {
1091 for hook in &hooks {
1092 if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1093 hook.on_event(&event)
1094 }))
1095 .is_err()
1096 {
1097 warn!("lifecycle hook panicked while handling event");
1098 }
1099 }
1100 }
1101 });
1102 }
1103
1104 fn emit_query_telemetry(
1105 &self,
1106 started: Instant,
1107 project_id: &str,
1108 scope_id: &str,
1109 table: &str,
1110 snapshot_seq: u64,
1111 result: &Result<QueryResult, QueryError>,
1112 ) {
1113 let hooks = self.telemetry_hooks.lock().clone();
1114 if hooks.is_empty() {
1115 return;
1116 }
1117 let (rows_examined, ok, error) = match result {
1118 Ok(res) => (res.rows_examined, true, None),
1119 Err(err) => (0usize, false, Some(err.to_string())),
1120 };
1121 let event = QueryTelemetryEvent {
1122 project_id: project_id.to_string(),
1123 scope_id: scope_id.to_string(),
1124 table: table.to_string(),
1125 snapshot_seq,
1126 rows_examined,
1127 latency_micros: started.elapsed().as_micros() as u64,
1128 ok,
1129 error,
1130 };
1131 for hook in hooks {
1132 hook.on_query(&event);
1133 }
1134 }
1135
1136 fn emit_commit_telemetry(
1137 &self,
1138 op: &'static str,
1139 started: Instant,
1140 result: &Result<CommitResult, AedbError>,
1141 ) {
1142 let hooks = self.telemetry_hooks.lock().clone();
1143 if hooks.is_empty() {
1144 return;
1145 }
1146 let (commit_seq, durable_head_seq, ok, error) = match result {
1147 Ok(res) => (Some(res.commit_seq), Some(res.durable_head_seq), true, None),
1148 Err(err) => (None, None, false, Some(err.to_string())),
1149 };
1150 let event = CommitTelemetryEvent {
1151 op,
1152 commit_seq,
1153 durable_head_seq,
1154 latency_micros: started.elapsed().as_micros() as u64,
1155 ok,
1156 error,
1157 };
1158 for hook in hooks {
1159 hook.on_commit(&event);
1160 }
1161 }
1162
1163 pub async fn query(
1164 &self,
1165 project_id: &str,
1166 scope_id: &str,
1167 query: Query,
1168 ) -> Result<QueryResult, QueryError> {
1169 if self.require_authenticated_calls {
1170 return Err(QueryError::PermissionDenied {
1171 permission: "authenticated caller required in secure mode".into(),
1172 scope: "anonymous".into(),
1173 });
1174 }
1175 self.query_with_options_as(None, project_id, scope_id, query, QueryOptions::default())
1176 .await
1177 }
1178
1179 pub(crate) async fn query_unchecked(
1180 &self,
1181 project_id: &str,
1182 scope_id: &str,
1183 query: Query,
1184 options: QueryOptions,
1185 ) -> Result<QueryResult, QueryError> {
1186 self.query_with_options_as(None, project_id, scope_id, query, options)
1187 .await
1188 }
1189
1190 pub async fn query_no_auth(
1191 &self,
1192 project_id: &str,
1193 scope_id: &str,
1194 query: Query,
1195 options: QueryOptions,
1196 ) -> Result<QueryResult, QueryError> {
1197 if self.require_authenticated_calls {
1198 return Err(QueryError::PermissionDenied {
1199 permission: "query_no_auth is unavailable in secure mode".into(),
1200 scope: "anonymous".into(),
1201 });
1202 }
1203 self.query_unchecked(project_id, scope_id, query, options)
1204 .await
1205 }
1206
1207 pub async fn query_with_options(
1208 &self,
1209 project_id: &str,
1210 scope_id: &str,
1211 query: Query,
1212 options: QueryOptions,
1213 ) -> Result<QueryResult, QueryError> {
1214 if self.require_authenticated_calls {
1215 return Err(QueryError::PermissionDenied {
1216 permission: "authenticated caller required in secure mode".into(),
1217 scope: "anonymous".into(),
1218 });
1219 }
1220 self.query_with_options_as(None, project_id, scope_id, query, options)
1221 .await
1222 }
1223
1224 pub async fn query_with_options_as(
1225 &self,
1226 caller: Option<&CallerContext>,
1227 project_id: &str,
1228 scope_id: &str,
1229 query: Query,
1230 mut options: QueryOptions,
1231 ) -> Result<QueryResult, QueryError> {
1232 if self.require_authenticated_calls && caller.is_none() {
1233 return Err(QueryError::PermissionDenied {
1234 permission: "authenticated caller required in secure mode".into(),
1235 scope: "anonymous".into(),
1236 });
1237 }
1238 if let Some(caller) = caller {
1239 ensure_query_caller_allowed(caller)?;
1240 }
1241
1242 if options.async_index.is_none() {
1243 options.async_index = query.use_index.clone();
1244 }
1245
1246 if let Some(cursor) = &options.cursor {
1247 let token = parse_cursor_seq(cursor).map_err(QueryError::from)?;
1248 options.consistency = ConsistencyMode::AtSeq(token);
1249 }
1250
1251 let lease = self
1252 .acquire_snapshot(options.consistency)
1253 .await
1254 .map_err(QueryError::from)?;
1255 let started = Instant::now();
1256 let table = query.table.clone();
1257 let result = execute_query_against_view(
1258 &lease.view,
1259 project_id,
1260 scope_id,
1261 query,
1262 &options,
1263 caller,
1264 self._config.max_scan_rows,
1265 );
1266 self.emit_query_telemetry(
1267 started,
1268 project_id,
1269 scope_id,
1270 &table,
1271 lease.view.seq,
1272 &result,
1273 );
1274 result
1275 }
1276
1277 pub async fn begin_read_tx(
1278 &self,
1279 consistency: ConsistencyMode,
1280 ) -> Result<ReadTx<'_>, AedbError> {
1281 if self.require_authenticated_calls {
1282 return Err(AedbError::PermissionDenied(
1283 "authenticated caller required in secure mode; use begin_read_tx_as".into(),
1284 ));
1285 }
1286 let lease = self.acquire_snapshot(consistency).await?;
1287 Ok(ReadTx {
1288 db: self,
1289 lease,
1290 caller: None,
1291 })
1292 }
1293
1294 pub async fn begin_read_tx_as(
1295 &self,
1296 caller: CallerContext,
1297 consistency: ConsistencyMode,
1298 ) -> Result<ReadTx<'_>, AedbError> {
1299 ensure_query_caller_allowed(&caller).map_err(|err| match err {
1300 QueryError::PermissionDenied { permission, .. } => {
1301 AedbError::PermissionDenied(permission)
1302 }
1303 other => AedbError::Validation(other.to_string()),
1304 })?;
1305 let lease = self.acquire_snapshot(consistency).await?;
1306 Ok(ReadTx {
1307 db: self,
1308 lease,
1309 caller: Some(caller),
1310 })
1311 }
1312
1313 pub async fn query_page_stable(
1314 &self,
1315 project_id: &str,
1316 scope_id: &str,
1317 mut query: Query,
1318 cursor: Option<String>,
1319 page_size: usize,
1320 consistency: ConsistencyMode,
1321 ) -> Result<QueryResult, QueryError> {
1322 if query.joins.is_empty() && query.order_by.is_empty() {
1323 let (_, catalog, _) = self.executor.snapshot_state().await;
1324 let (q_project, q_scope, q_table) =
1325 resolve_query_table_ref(project_id, scope_id, &query.table);
1326 if let Some(schema) = catalog
1327 .tables
1328 .get(&(namespace_key(&q_project, &q_scope), q_table))
1329 {
1330 for pk in &schema.primary_key {
1331 query = query.order_by(pk, Order::Asc);
1332 }
1333 }
1334 }
1335 query.limit = Some(page_size.max(1));
1336 self.query_with_options(
1337 project_id,
1338 scope_id,
1339 query,
1340 QueryOptions {
1341 consistency,
1342 cursor,
1343 ..QueryOptions::default()
1344 },
1345 )
1346 .await
1347 }
1348
1349 pub async fn query_batch(
1350 &self,
1351 project_id: &str,
1352 scope_id: &str,
1353 items: Vec<QueryBatchItem>,
1354 consistency: ConsistencyMode,
1355 ) -> Result<Vec<QueryResult>, QueryError> {
1356 let tx = self
1357 .begin_read_tx(consistency)
1358 .await
1359 .map_err(QueryError::from)?;
1360 tx.query_batch(project_id, scope_id, items).await
1361 }
1362
1363 pub async fn query_batch_as(
1364 &self,
1365 caller: CallerContext,
1366 project_id: &str,
1367 scope_id: &str,
1368 items: Vec<QueryBatchItem>,
1369 consistency: ConsistencyMode,
1370 ) -> Result<Vec<QueryResult>, QueryError> {
1371 let tx = self
1372 .begin_read_tx_as(caller, consistency)
1373 .await
1374 .map_err(QueryError::from)?;
1375 tx.query_batch(project_id, scope_id, items).await
1376 }
1377
1378 pub async fn exists(
1379 &self,
1380 project_id: &str,
1381 scope_id: &str,
1382 query: Query,
1383 consistency: ConsistencyMode,
1384 ) -> Result<bool, QueryError> {
1385 let tx = self
1386 .begin_read_tx(consistency)
1387 .await
1388 .map_err(QueryError::from)?;
1389 tx.exists(project_id, scope_id, query).await
1390 }
1391
1392 pub async fn exists_as(
1393 &self,
1394 caller: CallerContext,
1395 project_id: &str,
1396 scope_id: &str,
1397 query: Query,
1398 consistency: ConsistencyMode,
1399 ) -> Result<bool, QueryError> {
1400 let tx = self
1401 .begin_read_tx_as(caller, consistency)
1402 .await
1403 .map_err(QueryError::from)?;
1404 tx.exists(project_id, scope_id, query).await
1405 }
1406
1407 pub async fn explain_query(
1408 &self,
1409 project_id: &str,
1410 scope_id: &str,
1411 query: Query,
1412 options: QueryOptions,
1413 ) -> Result<QueryDiagnostics, QueryError> {
1414 self.explain_query_as(None, project_id, scope_id, query, options)
1415 .await
1416 }
1417
1418 pub async fn explain_query_as(
1419 &self,
1420 caller: Option<&CallerContext>,
1421 project_id: &str,
1422 scope_id: &str,
1423 query: Query,
1424 options: QueryOptions,
1425 ) -> Result<QueryDiagnostics, QueryError> {
1426 if self.require_authenticated_calls && caller.is_none() {
1427 return Err(QueryError::PermissionDenied {
1428 permission: "authenticated caller required in secure mode".into(),
1429 scope: "anonymous".into(),
1430 });
1431 }
1432
1433 let lease = self
1434 .acquire_snapshot(options.consistency)
1435 .await
1436 .map_err(QueryError::from)?;
1437 explain_query_against_view(
1438 &lease.view,
1439 project_id,
1440 scope_id,
1441 query,
1442 &options,
1443 caller,
1444 self._config.max_scan_rows,
1445 )
1446 }
1447
1448 pub async fn query_with_diagnostics(
1449 &self,
1450 project_id: &str,
1451 scope_id: &str,
1452 query: Query,
1453 options: QueryOptions,
1454 ) -> Result<QueryWithDiagnosticsResult, QueryError> {
1455 let diagnostics = self
1456 .explain_query(project_id, scope_id, query.clone(), options.clone())
1457 .await?;
1458 let result = self
1459 .query_with_options(project_id, scope_id, query, options)
1460 .await?;
1461 Ok(QueryWithDiagnosticsResult {
1462 result,
1463 diagnostics,
1464 })
1465 }
1466
1467 pub async fn query_with_diagnostics_as(
1468 &self,
1469 caller: &CallerContext,
1470 project_id: &str,
1471 scope_id: &str,
1472 query: Query,
1473 options: QueryOptions,
1474 ) -> Result<QueryWithDiagnosticsResult, QueryError> {
1475 let diagnostics = self
1476 .explain_query_as(
1477 Some(caller),
1478 project_id,
1479 scope_id,
1480 query.clone(),
1481 options.clone(),
1482 )
1483 .await?;
1484 let result = self
1485 .query_with_options_as(Some(caller), project_id, scope_id, query, options)
1486 .await?;
1487 Ok(QueryWithDiagnosticsResult {
1488 result,
1489 diagnostics,
1490 })
1491 }
1492
1493 pub async fn list_with_total_with(
1494 &self,
1495 request: ListWithTotalRequest,
1496 ) -> Result<ListPageResult, QueryError> {
1497 let tx = self
1498 .begin_read_tx(request.consistency)
1499 .await
1500 .map_err(QueryError::from)?;
1501 tx.list_with_total(
1502 &request.project_id,
1503 &request.scope_id,
1504 request.query,
1505 request.cursor,
1506 request.offset,
1507 request.page_size,
1508 )
1509 .await
1510 }
1511
1512 pub async fn list_with_total_as_with(
1513 &self,
1514 caller: CallerContext,
1515 request: ListWithTotalRequest,
1516 ) -> Result<ListPageResult, QueryError> {
1517 let tx = self
1518 .begin_read_tx_as(caller, request.consistency)
1519 .await
1520 .map_err(QueryError::from)?;
1521 tx.list_with_total(
1522 &request.project_id,
1523 &request.scope_id,
1524 request.query,
1525 request.cursor,
1526 request.offset,
1527 request.page_size,
1528 )
1529 .await
1530 }
1531
1532 #[allow(clippy::too_many_arguments)]
1533 pub async fn list_with_total(
1534 &self,
1535 project_id: &str,
1536 scope_id: &str,
1537 query: Query,
1538 cursor: Option<String>,
1539 offset: Option<usize>,
1540 page_size: usize,
1541 consistency: ConsistencyMode,
1542 ) -> Result<ListPageResult, QueryError> {
1543 self.list_with_total_with(ListWithTotalRequest {
1544 project_id: project_id.to_string(),
1545 scope_id: scope_id.to_string(),
1546 query,
1547 cursor,
1548 offset,
1549 page_size,
1550 consistency,
1551 })
1552 .await
1553 }
1554
1555 #[allow(clippy::too_many_arguments)]
1556 pub async fn list_with_total_as(
1557 &self,
1558 caller: CallerContext,
1559 project_id: &str,
1560 scope_id: &str,
1561 query: Query,
1562 cursor: Option<String>,
1563 offset: Option<usize>,
1564 page_size: usize,
1565 consistency: ConsistencyMode,
1566 ) -> Result<ListPageResult, QueryError> {
1567 self.list_with_total_as_with(
1568 caller,
1569 ListWithTotalRequest {
1570 project_id: project_id.to_string(),
1571 scope_id: scope_id.to_string(),
1572 query,
1573 cursor,
1574 offset,
1575 page_size,
1576 consistency,
1577 },
1578 )
1579 .await
1580 }
1581
1582 pub async fn lookup_then_hydrate_with(
1583 &self,
1584 request: LookupThenHydrateRequest,
1585 ) -> Result<(QueryResult, QueryResult), QueryError> {
1586 let tx = self
1587 .begin_read_tx(request.consistency)
1588 .await
1589 .map_err(QueryError::from)?;
1590 tx.lookup_then_hydrate(
1591 &request.project_id,
1592 &request.scope_id,
1593 request.source_query,
1594 request.source_key_index,
1595 request.hydrate_query,
1596 &request.hydrate_key_column,
1597 )
1598 .await
1599 }
1600
1601 pub async fn lookup_then_hydrate_as_with(
1602 &self,
1603 caller: CallerContext,
1604 request: LookupThenHydrateRequest,
1605 ) -> Result<(QueryResult, QueryResult), QueryError> {
1606 let tx = self
1607 .begin_read_tx_as(caller, request.consistency)
1608 .await
1609 .map_err(QueryError::from)?;
1610 tx.lookup_then_hydrate(
1611 &request.project_id,
1612 &request.scope_id,
1613 request.source_query,
1614 request.source_key_index,
1615 request.hydrate_query,
1616 &request.hydrate_key_column,
1617 )
1618 .await
1619 }
1620
1621 #[allow(clippy::too_many_arguments)]
1622 pub async fn lookup_then_hydrate(
1623 &self,
1624 project_id: &str,
1625 scope_id: &str,
1626 source_query: Query,
1627 source_key_index: usize,
1628 hydrate_query: Query,
1629 hydrate_key_column: &str,
1630 consistency: ConsistencyMode,
1631 ) -> Result<(QueryResult, QueryResult), QueryError> {
1632 self.lookup_then_hydrate_with(LookupThenHydrateRequest {
1633 project_id: project_id.to_string(),
1634 scope_id: scope_id.to_string(),
1635 source_query,
1636 source_key_index,
1637 hydrate_query,
1638 hydrate_key_column: hydrate_key_column.to_string(),
1639 consistency,
1640 })
1641 .await
1642 }
1643
1644 #[allow(clippy::too_many_arguments)]
1645 pub async fn lookup_then_hydrate_as(
1646 &self,
1647 caller: CallerContext,
1648 project_id: &str,
1649 scope_id: &str,
1650 source_query: Query,
1651 source_key_index: usize,
1652 hydrate_query: Query,
1653 hydrate_key_column: &str,
1654 consistency: ConsistencyMode,
1655 ) -> Result<(QueryResult, QueryResult), QueryError> {
1656 self.lookup_then_hydrate_as_with(
1657 caller,
1658 LookupThenHydrateRequest {
1659 project_id: project_id.to_string(),
1660 scope_id: scope_id.to_string(),
1661 source_query,
1662 source_key_index,
1663 hydrate_query,
1664 hydrate_key_column: hydrate_key_column.to_string(),
1665 consistency,
1666 },
1667 )
1668 .await
1669 }
1670
1671 pub async fn query_sql_read_only(
1672 &self,
1673 adapter: &dyn ReadOnlySqlAdapter,
1674 project_id: &str,
1675 scope_id: &str,
1676 sql: &str,
1677 consistency: ConsistencyMode,
1678 ) -> Result<QueryResult, QueryError> {
1679 let (query, mut options) = adapter.execute_read_only(project_id, scope_id, sql)?;
1680 options.consistency = consistency;
1681 options.allow_full_scan = true;
1682 self.query_with_options(project_id, scope_id, query, options)
1683 .await
1684 }
1685
1686 pub async fn query_sql_read_only_as(
1687 &self,
1688 adapter: &dyn ReadOnlySqlAdapter,
1689 caller: &CallerContext,
1690 project_id: &str,
1691 scope_id: &str,
1692 sql: &str,
1693 consistency: ConsistencyMode,
1694 ) -> Result<QueryResult, QueryError> {
1695 let (query, mut options) = adapter.execute_read_only(project_id, scope_id, sql)?;
1696 options.consistency = consistency;
1697 options.allow_full_scan = true;
1698 self.query_with_options_as(Some(caller), project_id, scope_id, query, options)
1699 .await
1700 }
1701
1702 pub async fn plan_sql_transaction(
1703 &self,
1704 consistency: ConsistencyMode,
1705 mutations: Vec<Mutation>,
1706 ) -> Result<SqlTransactionPlan, AedbError> {
1707 if mutations.is_empty() {
1708 return Err(AedbError::Validation(
1709 "sql transaction plan requires at least one mutation".into(),
1710 ));
1711 }
1712 Ok(SqlTransactionPlan {
1713 base_seq: self.snapshot_probe(consistency).await?,
1714 caller: None,
1715 mutations,
1716 })
1717 }
1718
1719 pub async fn plan_sql_transaction_as(
1720 &self,
1721 caller: CallerContext,
1722 consistency: ConsistencyMode,
1723 mutations: Vec<Mutation>,
1724 ) -> Result<SqlTransactionPlan, AedbError> {
1725 if mutations.is_empty() {
1726 return Err(AedbError::Validation(
1727 "sql transaction plan requires at least one mutation".into(),
1728 ));
1729 }
1730 ensure_external_caller_allowed(&caller)?;
1731 Ok(SqlTransactionPlan {
1732 base_seq: self.snapshot_probe(consistency).await?,
1733 caller: Some(caller),
1734 mutations,
1735 })
1736 }
1737
1738 pub async fn commit_sql_transaction_plan(
1739 &self,
1740 plan: SqlTransactionPlan,
1741 ) -> Result<CommitResult, AedbError> {
1742 self.commit_envelope(TransactionEnvelope {
1743 caller: plan.caller,
1744 idempotency_key: None,
1745 write_class: WriteClass::Standard,
1746 assertions: Vec::new(),
1747 read_set: ReadSet::default(),
1748 write_intent: WriteIntent {
1749 mutations: plan.mutations,
1750 },
1751 base_seq: plan.base_seq,
1752 })
1753 .await
1754 }
1755
1756 pub async fn commit_sql_transaction(
1757 &self,
1758 consistency: ConsistencyMode,
1759 mutations: Vec<Mutation>,
1760 ) -> Result<CommitResult, AedbError> {
1761 let plan = self.plan_sql_transaction(consistency, mutations).await?;
1762 self.commit_sql_transaction_plan(plan).await
1763 }
1764
1765 pub async fn commit_sql_transaction_as(
1766 &self,
1767 caller: CallerContext,
1768 consistency: ConsistencyMode,
1769 mutations: Vec<Mutation>,
1770 ) -> Result<CommitResult, AedbError> {
1771 let plan = self
1772 .plan_sql_transaction_as(caller, consistency, mutations)
1773 .await?;
1774 self.commit_sql_transaction_plan(plan).await
1775 }
1776
1777 pub async fn commit_many_atomic(
1778 &self,
1779 mutations: Vec<Mutation>,
1780 ) -> Result<CommitResult, AedbError> {
1781 if mutations.is_empty() {
1782 return Err(AedbError::Validation(
1783 "transaction envelope has no mutations".into(),
1784 ));
1785 }
1786 self.commit_envelope(TransactionEnvelope {
1787 caller: None,
1788 idempotency_key: None,
1789 write_class: WriteClass::Standard,
1790 assertions: Vec::new(),
1791 read_set: ReadSet::default(),
1792 write_intent: WriteIntent { mutations },
1793 base_seq: 0,
1796 })
1797 .await
1798 }
1799
1800 pub async fn commit_many_atomic_as(
1801 &self,
1802 caller: CallerContext,
1803 mutations: Vec<Mutation>,
1804 ) -> Result<CommitResult, AedbError> {
1805 if mutations.is_empty() {
1806 return Err(AedbError::Validation(
1807 "transaction envelope has no mutations".into(),
1808 ));
1809 }
1810 self.commit_envelope(TransactionEnvelope {
1811 caller: Some(caller),
1812 idempotency_key: None,
1813 write_class: WriteClass::Standard,
1814 assertions: Vec::new(),
1815 read_set: ReadSet::default(),
1816 write_intent: WriteIntent { mutations },
1817 base_seq: 0,
1820 })
1821 .await
1822 }
1823
1824 pub async fn delete_where(
1825 &self,
1826 project_id: &str,
1827 scope_id: &str,
1828 table_name: &str,
1829 predicate: Expr,
1830 limit: Option<usize>,
1831 ) -> Result<Option<CommitResult>, AedbError> {
1832 let result = self
1833 .commit(Mutation::DeleteWhere {
1834 project_id: project_id.to_string(),
1835 scope_id: scope_id.to_string(),
1836 table_name: table_name.to_string(),
1837 predicate,
1838 limit,
1839 })
1840 .await?;
1841 Ok(Some(result))
1842 }
1843
1844 pub async fn delete_where_as(
1845 &self,
1846 caller: CallerContext,
1847 project_id: &str,
1848 scope_id: &str,
1849 table_name: &str,
1850 predicate: Expr,
1851 limit: Option<usize>,
1852 ) -> Result<Option<CommitResult>, AedbError> {
1853 let result = self
1854 .commit_as(
1855 caller,
1856 Mutation::DeleteWhere {
1857 project_id: project_id.to_string(),
1858 scope_id: scope_id.to_string(),
1859 table_name: table_name.to_string(),
1860 predicate,
1861 limit,
1862 },
1863 )
1864 .await?;
1865 Ok(Some(result))
1866 }
1867
1868 pub async fn update_where(
1869 &self,
1870 project_id: &str,
1871 scope_id: &str,
1872 table_name: &str,
1873 predicate: Expr,
1874 updates: Vec<(String, Value)>,
1875 limit: Option<usize>,
1876 ) -> Result<Option<CommitResult>, AedbError> {
1877 let result = self
1878 .commit(Mutation::UpdateWhere {
1879 project_id: project_id.to_string(),
1880 scope_id: scope_id.to_string(),
1881 table_name: table_name.to_string(),
1882 predicate,
1883 updates,
1884 limit,
1885 })
1886 .await?;
1887 Ok(Some(result))
1888 }
1889
1890 pub async fn update_where_as_with(
1891 &self,
1892 request: UpdateWhereRequest,
1893 ) -> Result<Option<CommitResult>, AedbError> {
1894 let result = self
1895 .commit_as(
1896 request.caller,
1897 Mutation::UpdateWhere {
1898 project_id: request.project_id,
1899 scope_id: request.scope_id,
1900 table_name: request.table_name,
1901 predicate: request.predicate,
1902 updates: request.updates,
1903 limit: request.limit,
1904 },
1905 )
1906 .await?;
1907 Ok(Some(result))
1908 }
1909
1910 #[allow(clippy::too_many_arguments)]
1911 pub async fn update_where_as(
1912 &self,
1913 caller: CallerContext,
1914 project_id: &str,
1915 scope_id: &str,
1916 table_name: &str,
1917 predicate: Expr,
1918 updates: Vec<(String, Value)>,
1919 limit: Option<usize>,
1920 ) -> Result<Option<CommitResult>, AedbError> {
1921 self.update_where_as_with(UpdateWhereRequest {
1922 caller,
1923 project_id: project_id.to_string(),
1924 scope_id: scope_id.to_string(),
1925 table_name: table_name.to_string(),
1926 predicate,
1927 updates,
1928 limit,
1929 })
1930 .await
1931 }
1932
1933 pub async fn update_where_expr(
1934 &self,
1935 project_id: &str,
1936 scope_id: &str,
1937 table_name: &str,
1938 predicate: Expr,
1939 updates: Vec<(String, TableUpdateExpr)>,
1940 limit: Option<usize>,
1941 ) -> Result<Option<CommitResult>, AedbError> {
1942 let result = self
1943 .commit(Mutation::UpdateWhereExpr {
1944 project_id: project_id.to_string(),
1945 scope_id: scope_id.to_string(),
1946 table_name: table_name.to_string(),
1947 predicate,
1948 updates,
1949 limit,
1950 })
1951 .await?;
1952 Ok(Some(result))
1953 }
1954
1955 pub async fn update_where_expr_as_with(
1956 &self,
1957 request: UpdateWhereExprRequest,
1958 ) -> Result<Option<CommitResult>, AedbError> {
1959 let result = self
1960 .commit_as(
1961 request.caller,
1962 Mutation::UpdateWhereExpr {
1963 project_id: request.project_id,
1964 scope_id: request.scope_id,
1965 table_name: request.table_name,
1966 predicate: request.predicate,
1967 updates: request.updates,
1968 limit: request.limit,
1969 },
1970 )
1971 .await?;
1972 Ok(Some(result))
1973 }
1974
1975 #[allow(clippy::too_many_arguments)]
1976 pub async fn update_where_expr_as(
1977 &self,
1978 caller: CallerContext,
1979 project_id: &str,
1980 scope_id: &str,
1981 table_name: &str,
1982 predicate: Expr,
1983 updates: Vec<(String, TableUpdateExpr)>,
1984 limit: Option<usize>,
1985 ) -> Result<Option<CommitResult>, AedbError> {
1986 self.update_where_expr_as_with(UpdateWhereExprRequest {
1987 caller,
1988 project_id: project_id.to_string(),
1989 scope_id: scope_id.to_string(),
1990 table_name: table_name.to_string(),
1991 predicate,
1992 updates,
1993 limit,
1994 })
1995 .await
1996 }
1997
1998 pub async fn mutate_where_returning(
1999 &self,
2000 project_id: &str,
2001 scope_id: &str,
2002 table_name: &str,
2003 predicate: Expr,
2004 updates: Vec<(String, TableUpdateExpr)>,
2005 ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
2006 self.mutate_where_returning_inner(
2007 None, project_id, scope_id, table_name, predicate, updates,
2008 )
2009 .await
2010 }
2011
2012 pub async fn mutate_where_returning_as(
2013 &self,
2014 caller: CallerContext,
2015 project_id: &str,
2016 scope_id: &str,
2017 table_name: &str,
2018 predicate: Expr,
2019 updates: Vec<(String, TableUpdateExpr)>,
2020 ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
2021 self.mutate_where_returning_inner(
2022 Some(caller),
2023 project_id,
2024 scope_id,
2025 table_name,
2026 predicate,
2027 updates,
2028 )
2029 .await
2030 }
2031
2032 pub async fn claim_one(
2033 &self,
2034 project_id: &str,
2035 scope_id: &str,
2036 table_name: &str,
2037 predicate: Expr,
2038 updates: Vec<(String, TableUpdateExpr)>,
2039 ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
2040 self.mutate_where_returning(project_id, scope_id, table_name, predicate, updates)
2041 .await
2042 }
2043
2044 pub async fn claim_one_as(
2045 &self,
2046 caller: CallerContext,
2047 project_id: &str,
2048 scope_id: &str,
2049 table_name: &str,
2050 predicate: Expr,
2051 updates: Vec<(String, TableUpdateExpr)>,
2052 ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
2053 self.mutate_where_returning_as(caller, project_id, scope_id, table_name, predicate, updates)
2054 .await
2055 }
2056
2057 pub async fn kv_get(
2058 &self,
2059 project_id: &str,
2060 scope_id: &str,
2061 key: &[u8],
2062 consistency: ConsistencyMode,
2063 caller: &CallerContext,
2064 ) -> Result<Option<KvEntry>, QueryError> {
2065 ensure_query_caller_allowed(caller)?;
2066 let lease = self
2067 .acquire_snapshot(consistency)
2068 .await
2069 .map_err(QueryError::from)?;
2070 let snapshot = &lease.view.keyspace;
2071 let catalog = &lease.view.catalog;
2072 if !catalog.has_kv_read_permission(&caller.caller_id, project_id, scope_id, key) {
2073 return Err(QueryError::PermissionDenied {
2074 permission: format!("KvRead({project_id}.{scope_id})"),
2075 scope: caller.caller_id.clone(),
2076 });
2077 }
2078 Ok(snapshot.kv_get(project_id, scope_id, key).cloned())
2079 }
2080
2081 pub async fn kv_get_default_scope(
2082 &self,
2083 project_id: &str,
2084 key: &[u8],
2085 consistency: ConsistencyMode,
2086 caller: &CallerContext,
2087 ) -> Result<Option<KvEntry>, QueryError> {
2088 self.kv_get(
2089 project_id,
2090 crate::catalog::DEFAULT_SCOPE_ID,
2091 key,
2092 consistency,
2093 caller,
2094 )
2095 .await
2096 }
2097
2098 pub(crate) async fn kv_get_unchecked(
2099 &self,
2100 project_id: &str,
2101 scope_id: &str,
2102 key: &[u8],
2103 consistency: ConsistencyMode,
2104 ) -> Result<Option<KvEntry>, QueryError> {
2105 let lease = self
2106 .acquire_snapshot(consistency)
2107 .await
2108 .map_err(QueryError::from)?;
2109 Ok(lease
2110 .view
2111 .keyspace
2112 .kv_get(project_id, scope_id, key)
2113 .cloned())
2114 }
2115
2116 pub async fn kv_get_no_auth(
2117 &self,
2118 project_id: &str,
2119 scope_id: &str,
2120 key: &[u8],
2121 consistency: ConsistencyMode,
2122 ) -> Result<Option<KvEntry>, QueryError> {
2123 if self.require_authenticated_calls {
2124 return Err(QueryError::PermissionDenied {
2125 permission: "kv_get_no_auth is unavailable in secure mode".into(),
2126 scope: "anonymous".into(),
2127 });
2128 }
2129 self.kv_get_unchecked(project_id, scope_id, key, consistency)
2130 .await
2131 }
2132
2133 pub(crate) async fn kv_scan_prefix_unchecked(
2134 &self,
2135 project_id: &str,
2136 scope_id: &str,
2137 prefix: &[u8],
2138 limit: u64,
2139 consistency: ConsistencyMode,
2140 ) -> Result<Vec<(Vec<u8>, KvEntry)>, QueryError> {
2141 let lease = self
2142 .acquire_snapshot(consistency)
2143 .await
2144 .map_err(QueryError::from)?;
2145 let page_size = limit.min(self._config.max_scan_rows as u64) as usize;
2146 let start_bound = Bound::Included(prefix.to_vec());
2147 let end_bound = next_prefix_bytes(prefix).map_or(Bound::Unbounded, Bound::Excluded);
2148 let ns = NamespaceId::project_scope(project_id, scope_id);
2149 let mut entries = Vec::new();
2150 if let Some(kv) = lease.view.keyspace.namespaces.get(&ns).map(|n| &n.kv) {
2151 for (k, v) in kv.entries.range((start_bound, end_bound)) {
2152 if !k.starts_with(prefix) {
2153 break;
2154 }
2155 entries.push((k.clone(), v.clone()));
2156 if entries.len() == page_size {
2157 break;
2158 }
2159 }
2160 }
2161 Ok(entries)
2162 }
2163
2164 pub async fn kv_scan_prefix_no_auth(
2165 &self,
2166 project_id: &str,
2167 scope_id: &str,
2168 prefix: &[u8],
2169 limit: u64,
2170 consistency: ConsistencyMode,
2171 ) -> Result<Vec<(Vec<u8>, KvEntry)>, QueryError> {
2172 if self.require_authenticated_calls {
2173 return Err(QueryError::PermissionDenied {
2174 permission: "kv_scan_prefix_no_auth is unavailable in secure mode".into(),
2175 scope: "anonymous".into(),
2176 });
2177 }
2178 self.kv_scan_prefix_unchecked(project_id, scope_id, prefix, limit, consistency)
2179 .await
2180 }
2181
2182 #[allow(clippy::too_many_arguments)]
2183 pub async fn kv_scan_prefix(
2184 &self,
2185 project_id: &str,
2186 scope_id: &str,
2187 prefix: &[u8],
2188 limit: u64,
2189 cursor: Option<KvCursor>,
2190 consistency: ConsistencyMode,
2191 caller: &CallerContext,
2192 ) -> Result<KvScanResult, QueryError> {
2193 ensure_query_caller_allowed(caller)?;
2194 let effective_consistency = if let Some(c) = &cursor {
2195 ConsistencyMode::AtSeq(c.snapshot_seq)
2196 } else {
2197 consistency
2198 };
2199 let lease = self
2200 .acquire_snapshot(effective_consistency)
2201 .await
2202 .map_err(QueryError::from)?;
2203 let snapshot = &lease.view.keyspace;
2204 let catalog = &lease.view.catalog;
2205 let snapshot_seq = lease.view.seq;
2206 let allowed_prefixes =
2207 match catalog.kv_read_prefixes_for_caller(&caller.caller_id, project_id, scope_id) {
2208 Some(prefixes) => prefixes,
2209 None => {
2210 return Err(QueryError::PermissionDenied {
2211 permission: format!("KvRead({project_id}.{scope_id})"),
2212 scope: caller.caller_id.clone(),
2213 });
2214 }
2215 };
2216 if let Some(c) = &cursor
2217 && c.snapshot_seq != snapshot_seq
2218 {
2219 return Err(QueryError::InvalidQuery {
2220 reason: "cursor snapshot_seq mismatch".into(),
2221 });
2222 }
2223 let page_size = limit.min(self._config.max_scan_rows as u64) as usize;
2224 let start_bound = cursor
2225 .as_ref()
2226 .map_or(Bound::Included(prefix.to_vec()), |c| {
2227 Bound::Excluded(c.last_key.clone())
2228 });
2229 let end_bound = next_prefix_bytes(prefix).map_or(Bound::Unbounded, Bound::Excluded);
2230
2231 let ns = NamespaceId::project_scope(project_id, scope_id);
2232 let mut entries = Vec::new();
2233 if let Some(kv) = snapshot.namespaces.get(&ns).map(|n| &n.kv) {
2234 for (k, v) in kv.entries.range((start_bound, end_bound)) {
2235 if !k.starts_with(prefix) {
2236 break;
2237 }
2238 if !allowed_prefixes.is_empty()
2239 && !allowed_prefixes
2240 .iter()
2241 .any(|allowed| k.starts_with(allowed))
2242 {
2243 continue;
2244 }
2245 entries.push((k.clone(), v.clone()));
2246 if entries.len() > page_size {
2247 break;
2248 }
2249 }
2250 }
2251
2252 let truncated = entries.len() > page_size;
2253 if truncated {
2254 entries.truncate(page_size);
2255 }
2256 let next_cursor = if truncated {
2257 entries.last().map(|(k, _)| KvCursor {
2258 snapshot_seq,
2259 last_key: k.clone(),
2260 page_size: page_size as u64,
2261 })
2262 } else {
2263 None
2264 };
2265 Ok(KvScanResult {
2266 entries,
2267 cursor: next_cursor,
2268 snapshot_seq,
2269 truncated,
2270 })
2271 }
2272
2273 pub async fn kv_scan_prefix_default_scope(
2274 &self,
2275 project_id: &str,
2276 prefix: &[u8],
2277 limit: u64,
2278 cursor: Option<KvCursor>,
2279 consistency: ConsistencyMode,
2280 caller: &CallerContext,
2281 ) -> Result<KvScanResult, QueryError> {
2282 self.kv_scan_prefix(
2283 project_id,
2284 crate::catalog::DEFAULT_SCOPE_ID,
2285 prefix,
2286 limit,
2287 cursor,
2288 consistency,
2289 caller,
2290 )
2291 .await
2292 }
2293
2294 #[allow(clippy::too_many_arguments)]
2295 pub async fn kv_scan_range(
2296 &self,
2297 project_id: &str,
2298 scope_id: &str,
2299 start: Bound<Vec<u8>>,
2300 end: Bound<Vec<u8>>,
2301 limit: u64,
2302 cursor: Option<KvCursor>,
2303 consistency: ConsistencyMode,
2304 caller: &CallerContext,
2305 ) -> Result<KvScanResult, QueryError> {
2306 ensure_query_caller_allowed(caller)?;
2307 let effective_consistency = if let Some(c) = &cursor {
2308 ConsistencyMode::AtSeq(c.snapshot_seq)
2309 } else {
2310 consistency
2311 };
2312 let lease = self
2313 .acquire_snapshot(effective_consistency)
2314 .await
2315 .map_err(QueryError::from)?;
2316 let snapshot = &lease.view.keyspace;
2317 let catalog = &lease.view.catalog;
2318 let snapshot_seq = lease.view.seq;
2319 let allowed_prefixes =
2320 match catalog.kv_read_prefixes_for_caller(&caller.caller_id, project_id, scope_id) {
2321 Some(prefixes) => prefixes,
2322 None => {
2323 return Err(QueryError::PermissionDenied {
2324 permission: format!("KvRead({project_id}.{scope_id})"),
2325 scope: caller.caller_id.clone(),
2326 });
2327 }
2328 };
2329 if let Some(c) = &cursor
2330 && c.snapshot_seq != snapshot_seq
2331 {
2332 return Err(QueryError::InvalidQuery {
2333 reason: "cursor snapshot_seq mismatch".into(),
2334 });
2335 }
2336 let page_size = limit.min(self._config.max_scan_rows as u64) as usize;
2337 let adjusted_start = match (&cursor, start) {
2338 (Some(c), _) => Bound::Excluded(c.last_key.clone()),
2339 (None, b) => b,
2340 };
2341
2342 let ns = NamespaceId::project_scope(project_id, scope_id);
2343 let mut entries = Vec::new();
2344 if let Some(kv) = snapshot.namespaces.get(&ns).map(|n| &n.kv) {
2345 for (k, v) in kv.entries.range((adjusted_start, end)) {
2346 if !allowed_prefixes.is_empty()
2347 && !allowed_prefixes
2348 .iter()
2349 .any(|allowed| k.starts_with(allowed))
2350 {
2351 continue;
2352 }
2353 entries.push((k.clone(), v.clone()));
2354 if entries.len() > page_size {
2355 break;
2356 }
2357 }
2358 }
2359
2360 let truncated = entries.len() > page_size;
2361 if truncated {
2362 entries.truncate(page_size);
2363 }
2364 let next_cursor = if truncated {
2365 entries.last().map(|(k, _)| KvCursor {
2366 snapshot_seq,
2367 last_key: k.clone(),
2368 page_size: page_size as u64,
2369 })
2370 } else {
2371 None
2372 };
2373 Ok(KvScanResult {
2374 entries,
2375 cursor: next_cursor,
2376 snapshot_seq,
2377 truncated,
2378 })
2379 }
2380
2381 pub async fn kv_scan_all_scopes(
2382 &self,
2383 project_id: &str,
2384 prefix: &[u8],
2385 limit: u64,
2386 consistency: ConsistencyMode,
2387 caller: &CallerContext,
2388 ) -> Result<Vec<ScopedKvEntry>, QueryError> {
2389 ensure_query_caller_allowed(caller)?;
2390 let (_, catalog, _) = self.executor.snapshot_state().await;
2391 let project_read = catalog.has_permission(
2392 &caller.caller_id,
2393 &Permission::KvRead {
2394 project_id: project_id.to_string(),
2395 scope_id: None,
2396 prefix: None,
2397 },
2398 ) || catalog.has_permission(
2399 &caller.caller_id,
2400 &Permission::ProjectAdmin {
2401 project_id: project_id.to_string(),
2402 },
2403 );
2404 if !project_read {
2405 return Err(QueryError::PermissionDenied {
2406 permission: format!("KvRead({project_id}.*)"),
2407 scope: caller.caller_id.clone(),
2408 });
2409 }
2410 let lease = self
2411 .acquire_snapshot(consistency)
2412 .await
2413 .map_err(QueryError::from)?;
2414 let snapshot = &lease.view.keyspace;
2415 let mut out = Vec::new();
2416 for (ns_id, ns) in snapshot.namespaces.iter() {
2417 let Some(ns_key) = ns_id.as_project_scope_key() else {
2418 continue;
2419 };
2420 let Some((p, scope)) = ns_key.split_once("::") else {
2421 continue;
2422 };
2423 if p != project_id {
2424 continue;
2425 }
2426 for (k, v) in ns.kv.entries.iter() {
2427 if !k.starts_with(prefix) {
2428 continue;
2429 }
2430 out.push(ScopedKvEntry {
2431 scope_id: scope.to_string(),
2432 key: k.clone(),
2433 value: v.value.clone(),
2434 version: v.version,
2435 });
2436 if out.len() >= limit as usize {
2437 return Ok(out);
2438 }
2439 }
2440 }
2441 Ok(out)
2442 }
2443
2444 pub async fn kv_set(
2445 &self,
2446 project_id: &str,
2447 scope_id: &str,
2448 key: Vec<u8>,
2449 value: Vec<u8>,
2450 ) -> Result<CommitResult, AedbError> {
2451 self.commit(Mutation::KvSet {
2452 project_id: project_id.to_string(),
2453 scope_id: scope_id.to_string(),
2454 key,
2455 value,
2456 })
2457 .await
2458 }
2459
2460 pub async fn kv_set_default_scope(
2461 &self,
2462 project_id: &str,
2463 key: Vec<u8>,
2464 value: Vec<u8>,
2465 ) -> Result<CommitResult, AedbError> {
2466 self.kv_set(project_id, crate::catalog::DEFAULT_SCOPE_ID, key, value)
2467 .await
2468 }
2469
2470 pub async fn kv_set_as(
2471 &self,
2472 caller: CallerContext,
2473 project_id: &str,
2474 scope_id: &str,
2475 key: Vec<u8>,
2476 value: Vec<u8>,
2477 ) -> Result<CommitResult, AedbError> {
2478 self.commit_as(
2479 caller,
2480 Mutation::KvSet {
2481 project_id: project_id.to_string(),
2482 scope_id: scope_id.to_string(),
2483 key,
2484 value,
2485 },
2486 )
2487 .await
2488 }
2489
2490 pub async fn kv_del(
2491 &self,
2492 project_id: &str,
2493 scope_id: &str,
2494 key: Vec<u8>,
2495 ) -> Result<CommitResult, AedbError> {
2496 self.commit(Mutation::KvDel {
2497 project_id: project_id.to_string(),
2498 scope_id: scope_id.to_string(),
2499 key,
2500 })
2501 .await
2502 }
2503
2504 pub async fn kv_del_default_scope(
2505 &self,
2506 project_id: &str,
2507 key: Vec<u8>,
2508 ) -> Result<CommitResult, AedbError> {
2509 self.kv_del(project_id, crate::catalog::DEFAULT_SCOPE_ID, key)
2510 .await
2511 }
2512
2513 pub async fn kv_del_as(
2514 &self,
2515 caller: CallerContext,
2516 project_id: &str,
2517 scope_id: &str,
2518 key: Vec<u8>,
2519 ) -> Result<CommitResult, AedbError> {
2520 self.commit_as(
2521 caller,
2522 Mutation::KvDel {
2523 project_id: project_id.to_string(),
2524 scope_id: scope_id.to_string(),
2525 key,
2526 },
2527 )
2528 .await
2529 }
2530
2531 pub async fn kv_inc_u256(
2532 &self,
2533 project_id: &str,
2534 scope_id: &str,
2535 key: Vec<u8>,
2536 amount_be: [u8; 32],
2537 ) -> Result<CommitResult, AedbError> {
2538 self.commit(Mutation::KvIncU256 {
2539 project_id: project_id.to_string(),
2540 scope_id: scope_id.to_string(),
2541 key,
2542 amount_be,
2543 })
2544 .await
2545 }
2546
2547 pub async fn kv_inc_u256_as(
2548 &self,
2549 caller: CallerContext,
2550 project_id: &str,
2551 scope_id: &str,
2552 key: Vec<u8>,
2553 amount_be: [u8; 32],
2554 ) -> Result<CommitResult, AedbError> {
2555 self.commit_as(
2556 caller,
2557 Mutation::KvIncU256 {
2558 project_id: project_id.to_string(),
2559 scope_id: scope_id.to_string(),
2560 key,
2561 amount_be,
2562 },
2563 )
2564 .await
2565 }
2566
2567 pub async fn kv_dec_u256(
2568 &self,
2569 project_id: &str,
2570 scope_id: &str,
2571 key: Vec<u8>,
2572 amount_be: [u8; 32],
2573 ) -> Result<CommitResult, AedbError> {
2574 self.commit(Mutation::KvDecU256 {
2575 project_id: project_id.to_string(),
2576 scope_id: scope_id.to_string(),
2577 key,
2578 amount_be,
2579 })
2580 .await
2581 }
2582
2583 pub async fn kv_dec_u256_as(
2584 &self,
2585 caller: CallerContext,
2586 project_id: &str,
2587 scope_id: &str,
2588 key: Vec<u8>,
2589 amount_be: [u8; 32],
2590 ) -> Result<CommitResult, AedbError> {
2591 self.commit_as(
2592 caller,
2593 Mutation::KvDecU256 {
2594 project_id: project_id.to_string(),
2595 scope_id: scope_id.to_string(),
2596 key,
2597 amount_be,
2598 },
2599 )
2600 .await
2601 }
2602
2603 pub async fn kv_compare_and_swap(
2604 &self,
2605 project_id: &str,
2606 scope_id: &str,
2607 key: Vec<u8>,
2608 value: Vec<u8>,
2609 expected_seq: u64,
2610 ) -> Result<CommitResult, AedbError> {
2611 self.commit_envelope(TransactionEnvelope {
2612 caller: None,
2613 idempotency_key: None,
2614 write_class: crate::commit::tx::WriteClass::Standard,
2615 assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2616 project_id: project_id.to_string(),
2617 scope_id: scope_id.to_string(),
2618 key: key.clone(),
2619 expected_seq,
2620 }],
2621 read_set: crate::commit::tx::ReadSet::default(),
2622 write_intent: crate::commit::tx::WriteIntent {
2623 mutations: vec![Mutation::KvSet {
2624 project_id: project_id.to_string(),
2625 scope_id: scope_id.to_string(),
2626 key,
2627 value,
2628 }],
2629 },
2630 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2631 })
2632 .await
2633 }
2634
2635 pub async fn kv_compare_and_swap_as(
2636 &self,
2637 caller: CallerContext,
2638 project_id: &str,
2639 scope_id: &str,
2640 key: Vec<u8>,
2641 value: Vec<u8>,
2642 expected_seq: u64,
2643 ) -> Result<CommitResult, AedbError> {
2644 self.commit_envelope(TransactionEnvelope {
2645 caller: Some(caller),
2646 idempotency_key: None,
2647 write_class: crate::commit::tx::WriteClass::Standard,
2648 assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2649 project_id: project_id.to_string(),
2650 scope_id: scope_id.to_string(),
2651 key: key.clone(),
2652 expected_seq,
2653 }],
2654 read_set: crate::commit::tx::ReadSet::default(),
2655 write_intent: crate::commit::tx::WriteIntent {
2656 mutations: vec![Mutation::KvSet {
2657 project_id: project_id.to_string(),
2658 scope_id: scope_id.to_string(),
2659 key,
2660 value,
2661 }],
2662 },
2663 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2664 })
2665 .await
2666 }
2667
2668 pub async fn kv_compare_and_inc_u256(
2669 &self,
2670 project_id: &str,
2671 scope_id: &str,
2672 key: Vec<u8>,
2673 amount_be: [u8; 32],
2674 expected_seq: u64,
2675 ) -> Result<CommitResult, AedbError> {
2676 self.commit_envelope(TransactionEnvelope {
2677 caller: None,
2678 idempotency_key: None,
2679 write_class: crate::commit::tx::WriteClass::Standard,
2680 assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2681 project_id: project_id.to_string(),
2682 scope_id: scope_id.to_string(),
2683 key: key.clone(),
2684 expected_seq,
2685 }],
2686 read_set: crate::commit::tx::ReadSet::default(),
2687 write_intent: crate::commit::tx::WriteIntent {
2688 mutations: vec![Mutation::KvIncU256 {
2689 project_id: project_id.to_string(),
2690 scope_id: scope_id.to_string(),
2691 key,
2692 amount_be,
2693 }],
2694 },
2695 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2696 })
2697 .await
2698 }
2699
2700 pub async fn kv_compare_and_inc_u256_as(
2701 &self,
2702 caller: CallerContext,
2703 project_id: &str,
2704 scope_id: &str,
2705 key: Vec<u8>,
2706 amount_be: [u8; 32],
2707 expected_seq: u64,
2708 ) -> Result<CommitResult, AedbError> {
2709 self.commit_envelope(TransactionEnvelope {
2710 caller: Some(caller),
2711 idempotency_key: None,
2712 write_class: crate::commit::tx::WriteClass::Standard,
2713 assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2714 project_id: project_id.to_string(),
2715 scope_id: scope_id.to_string(),
2716 key: key.clone(),
2717 expected_seq,
2718 }],
2719 read_set: crate::commit::tx::ReadSet::default(),
2720 write_intent: crate::commit::tx::WriteIntent {
2721 mutations: vec![Mutation::KvIncU256 {
2722 project_id: project_id.to_string(),
2723 scope_id: scope_id.to_string(),
2724 key,
2725 amount_be,
2726 }],
2727 },
2728 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2729 })
2730 .await
2731 }
2732
2733 pub async fn kv_compare_and_dec_u256(
2734 &self,
2735 project_id: &str,
2736 scope_id: &str,
2737 key: Vec<u8>,
2738 amount_be: [u8; 32],
2739 expected_seq: u64,
2740 ) -> Result<CommitResult, AedbError> {
2741 self.commit_envelope(TransactionEnvelope {
2742 caller: None,
2743 idempotency_key: None,
2744 write_class: crate::commit::tx::WriteClass::Standard,
2745 assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2746 project_id: project_id.to_string(),
2747 scope_id: scope_id.to_string(),
2748 key: key.clone(),
2749 expected_seq,
2750 }],
2751 read_set: crate::commit::tx::ReadSet::default(),
2752 write_intent: crate::commit::tx::WriteIntent {
2753 mutations: vec![Mutation::KvDecU256 {
2754 project_id: project_id.to_string(),
2755 scope_id: scope_id.to_string(),
2756 key,
2757 amount_be,
2758 }],
2759 },
2760 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2761 })
2762 .await
2763 }
2764
2765 pub async fn kv_compare_and_dec_u256_as(
2766 &self,
2767 caller: CallerContext,
2768 project_id: &str,
2769 scope_id: &str,
2770 key: Vec<u8>,
2771 amount_be: [u8; 32],
2772 expected_seq: u64,
2773 ) -> Result<CommitResult, AedbError> {
2774 self.commit_envelope(TransactionEnvelope {
2775 caller: Some(caller),
2776 idempotency_key: None,
2777 write_class: crate::commit::tx::WriteClass::Standard,
2778 assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2779 project_id: project_id.to_string(),
2780 scope_id: scope_id.to_string(),
2781 key: key.clone(),
2782 expected_seq,
2783 }],
2784 read_set: crate::commit::tx::ReadSet::default(),
2785 write_intent: crate::commit::tx::WriteIntent {
2786 mutations: vec![Mutation::KvDecU256 {
2787 project_id: project_id.to_string(),
2788 scope_id: scope_id.to_string(),
2789 key,
2790 amount_be,
2791 }],
2792 },
2793 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2794 })
2795 .await
2796 }
2797
2798 pub async fn table_inc_u256(
2799 &self,
2800 project_id: &str,
2801 scope_id: &str,
2802 table_name: &str,
2803 primary_key: Vec<Value>,
2804 column: &str,
2805 amount_be: [u8; 32],
2806 ) -> Result<CommitResult, AedbError> {
2807 self.commit(Mutation::TableIncU256 {
2808 project_id: project_id.to_string(),
2809 scope_id: scope_id.to_string(),
2810 table_name: table_name.to_string(),
2811 primary_key,
2812 column: column.to_string(),
2813 amount_be,
2814 })
2815 .await
2816 }
2817
2818 pub async fn table_inc_u256_as_with(
2819 &self,
2820 request: TableU256MutationRequest,
2821 ) -> Result<CommitResult, AedbError> {
2822 self.commit_as(
2823 request.caller,
2824 Mutation::TableIncU256 {
2825 project_id: request.project_id,
2826 scope_id: request.scope_id,
2827 table_name: request.table_name,
2828 primary_key: request.primary_key,
2829 column: request.column,
2830 amount_be: request.amount_be,
2831 },
2832 )
2833 .await
2834 }
2835
2836 #[allow(clippy::too_many_arguments)]
2837 pub async fn table_inc_u256_as(
2838 &self,
2839 caller: CallerContext,
2840 project_id: &str,
2841 scope_id: &str,
2842 table_name: &str,
2843 primary_key: Vec<Value>,
2844 column: &str,
2845 amount_be: [u8; 32],
2846 ) -> Result<CommitResult, AedbError> {
2847 self.table_inc_u256_as_with(TableU256MutationRequest {
2848 caller,
2849 project_id: project_id.to_string(),
2850 scope_id: scope_id.to_string(),
2851 table_name: table_name.to_string(),
2852 primary_key,
2853 column: column.to_string(),
2854 amount_be,
2855 })
2856 .await
2857 }
2858
2859 pub async fn table_dec_u256(
2860 &self,
2861 project_id: &str,
2862 scope_id: &str,
2863 table_name: &str,
2864 primary_key: Vec<Value>,
2865 column: &str,
2866 amount_be: [u8; 32],
2867 ) -> Result<CommitResult, AedbError> {
2868 self.commit(Mutation::TableDecU256 {
2869 project_id: project_id.to_string(),
2870 scope_id: scope_id.to_string(),
2871 table_name: table_name.to_string(),
2872 primary_key,
2873 column: column.to_string(),
2874 amount_be,
2875 })
2876 .await
2877 }
2878
2879 pub async fn table_dec_u256_as_with(
2880 &self,
2881 request: TableU256MutationRequest,
2882 ) -> Result<CommitResult, AedbError> {
2883 self.commit_as(
2884 request.caller,
2885 Mutation::TableDecU256 {
2886 project_id: request.project_id,
2887 scope_id: request.scope_id,
2888 table_name: request.table_name,
2889 primary_key: request.primary_key,
2890 column: request.column,
2891 amount_be: request.amount_be,
2892 },
2893 )
2894 .await
2895 }
2896
2897 #[allow(clippy::too_many_arguments)]
2898 pub async fn table_dec_u256_as(
2899 &self,
2900 caller: CallerContext,
2901 project_id: &str,
2902 scope_id: &str,
2903 table_name: &str,
2904 primary_key: Vec<Value>,
2905 column: &str,
2906 amount_be: [u8; 32],
2907 ) -> Result<CommitResult, AedbError> {
2908 self.table_dec_u256_as_with(TableU256MutationRequest {
2909 caller,
2910 project_id: project_id.to_string(),
2911 scope_id: scope_id.to_string(),
2912 table_name: table_name.to_string(),
2913 primary_key,
2914 column: column.to_string(),
2915 amount_be,
2916 })
2917 .await
2918 }
2919
2920 pub async fn order_book_new(
2921 &self,
2922 project_id: &str,
2923 scope_id: &str,
2924 request: OrderRequest,
2925 ) -> Result<CommitResult, AedbError> {
2926 self.preflight_order_book_new_if_high_reject_risk(None, project_id, scope_id, &request)
2927 .await?;
2928 let mutation = Mutation::OrderBookNew {
2929 project_id: project_id.to_string(),
2930 scope_id: scope_id.to_string(),
2931 request,
2932 };
2933 let (_, catalog, _) = self.executor.snapshot_state().await;
2934 validate_mutation_with_config(&catalog, &mutation, &self._config)?;
2935 self.commit_prevalidated_internal("order_book_new", mutation)
2936 .await
2937 }
2938
2939 pub async fn order_book_new_with_finality(
2940 &self,
2941 project_id: &str,
2942 scope_id: &str,
2943 request: OrderRequest,
2944 finality: CommitFinality,
2945 ) -> Result<CommitResult, AedbError> {
2946 let mut result = self.order_book_new(project_id, scope_id, request).await?;
2947 self.enforce_finality(&mut result, finality).await?;
2948 Ok(result)
2949 }
2950
2951 pub async fn order_book_define_table(
2952 &self,
2953 project_id: &str,
2954 scope_id: &str,
2955 table_id: &str,
2956 mode: OrderBookTableMode,
2957 ) -> Result<CommitResult, AedbError> {
2958 self.commit_prevalidated_internal(
2959 "order_book_define_table",
2960 Mutation::OrderBookDefineTable {
2961 project_id: project_id.to_string(),
2962 scope_id: scope_id.to_string(),
2963 table_id: table_id.to_string(),
2964 mode,
2965 },
2966 )
2967 .await
2968 }
2969
2970 pub async fn order_book_define_table_as(
2971 &self,
2972 caller: CallerContext,
2973 project_id: &str,
2974 scope_id: &str,
2975 table_id: &str,
2976 mode: OrderBookTableMode,
2977 ) -> Result<CommitResult, AedbError> {
2978 self.commit_as(
2979 caller,
2980 Mutation::OrderBookDefineTable {
2981 project_id: project_id.to_string(),
2982 scope_id: scope_id.to_string(),
2983 table_id: table_id.to_string(),
2984 mode,
2985 },
2986 )
2987 .await
2988 }
2989
2990 pub async fn order_book_drop_table(
2991 &self,
2992 project_id: &str,
2993 scope_id: &str,
2994 table_id: &str,
2995 ) -> Result<CommitResult, AedbError> {
2996 self.commit_prevalidated_internal(
2997 "order_book_drop_table",
2998 Mutation::OrderBookDropTable {
2999 project_id: project_id.to_string(),
3000 scope_id: scope_id.to_string(),
3001 table_id: table_id.to_string(),
3002 },
3003 )
3004 .await
3005 }
3006
3007 pub async fn order_book_drop_table_as(
3008 &self,
3009 caller: CallerContext,
3010 project_id: &str,
3011 scope_id: &str,
3012 table_id: &str,
3013 ) -> Result<CommitResult, AedbError> {
3014 self.commit_as(
3015 caller,
3016 Mutation::OrderBookDropTable {
3017 project_id: project_id.to_string(),
3018 scope_id: scope_id.to_string(),
3019 table_id: table_id.to_string(),
3020 },
3021 )
3022 .await
3023 }
3024
3025 pub async fn order_book_set_instrument_config(
3026 &self,
3027 project_id: &str,
3028 scope_id: &str,
3029 instrument: &str,
3030 config: InstrumentConfig,
3031 ) -> Result<CommitResult, AedbError> {
3032 self.commit_prevalidated_internal(
3033 "order_book_set_instrument_config",
3034 Mutation::OrderBookSetInstrumentConfig {
3035 project_id: project_id.to_string(),
3036 scope_id: scope_id.to_string(),
3037 instrument: instrument.to_string(),
3038 config,
3039 },
3040 )
3041 .await
3042 }
3043
3044 pub async fn order_book_set_instrument_config_as(
3045 &self,
3046 caller: CallerContext,
3047 project_id: &str,
3048 scope_id: &str,
3049 instrument: &str,
3050 config: InstrumentConfig,
3051 ) -> Result<CommitResult, AedbError> {
3052 self.commit_as(
3053 caller,
3054 Mutation::OrderBookSetInstrumentConfig {
3055 project_id: project_id.to_string(),
3056 scope_id: scope_id.to_string(),
3057 instrument: instrument.to_string(),
3058 config,
3059 },
3060 )
3061 .await
3062 }
3063
3064 pub async fn order_book_set_instrument_halted(
3065 &self,
3066 project_id: &str,
3067 scope_id: &str,
3068 instrument: &str,
3069 halted: bool,
3070 ) -> Result<CommitResult, AedbError> {
3071 self.commit_prevalidated_internal(
3072 "order_book_set_instrument_halted",
3073 Mutation::OrderBookSetInstrumentHalted {
3074 project_id: project_id.to_string(),
3075 scope_id: scope_id.to_string(),
3076 instrument: instrument.to_string(),
3077 halted,
3078 },
3079 )
3080 .await
3081 }
3082
3083 pub async fn order_book_set_instrument_halted_as(
3084 &self,
3085 caller: CallerContext,
3086 project_id: &str,
3087 scope_id: &str,
3088 instrument: &str,
3089 halted: bool,
3090 ) -> Result<CommitResult, AedbError> {
3091 self.commit_as(
3092 caller,
3093 Mutation::OrderBookSetInstrumentHalted {
3094 project_id: project_id.to_string(),
3095 scope_id: scope_id.to_string(),
3096 instrument: instrument.to_string(),
3097 halted,
3098 },
3099 )
3100 .await
3101 }
3102
3103 pub async fn order_book_new_in_table(
3104 &self,
3105 project_id: &str,
3106 scope_id: &str,
3107 table_id: &str,
3108 asset_id: &str,
3109 mut request: OrderRequest,
3110 ) -> Result<CommitResult, AedbError> {
3111 request.instrument = scoped_instrument(table_id, asset_id);
3112 self.order_book_new(project_id, scope_id, request).await
3113 }
3114
3115 pub async fn order_book_new_as(
3116 &self,
3117 caller: CallerContext,
3118 project_id: &str,
3119 scope_id: &str,
3120 request: OrderRequest,
3121 ) -> Result<CommitResult, AedbError> {
3122 self.preflight_order_book_new_if_high_reject_risk(
3123 Some(&caller),
3124 project_id,
3125 scope_id,
3126 &request,
3127 )
3128 .await?;
3129 self.commit_as(
3130 caller,
3131 Mutation::OrderBookNew {
3132 project_id: project_id.to_string(),
3133 scope_id: scope_id.to_string(),
3134 request,
3135 },
3136 )
3137 .await
3138 }
3139
3140 pub async fn order_book_new_as_with_finality(
3141 &self,
3142 caller: CallerContext,
3143 project_id: &str,
3144 scope_id: &str,
3145 request: OrderRequest,
3146 finality: CommitFinality,
3147 ) -> Result<CommitResult, AedbError> {
3148 let mut result = self
3149 .order_book_new_as(caller, project_id, scope_id, request)
3150 .await?;
3151 self.enforce_finality(&mut result, finality).await?;
3152 Ok(result)
3153 }
3154
3155 fn should_preflight_order_book_new(request: &OrderRequest) -> bool {
3156 request.exec_instructions.post_only()
3157 || matches!(request.time_in_force, TimeInForce::Fok)
3158 || matches!(request.order_type, OrderType::Market)
3159 }
3160
3161 async fn preflight_order_book_new_if_high_reject_risk(
3162 &self,
3163 caller: Option<&CallerContext>,
3164 project_id: &str,
3165 scope_id: &str,
3166 request: &OrderRequest,
3167 ) -> Result<(), AedbError> {
3168 if !Self::should_preflight_order_book_new(request) {
3169 return Ok(());
3170 }
3171 let mutation = Mutation::OrderBookNew {
3172 project_id: project_id.to_string(),
3173 scope_id: scope_id.to_string(),
3174 request: request.clone(),
3175 };
3176 let preflight_result = if let Some(caller) = caller {
3177 self.preflight_as(caller, mutation).await?
3178 } else {
3179 self.preflight(mutation).await
3180 };
3181 if let PreflightResult::Err { reason } = preflight_result {
3182 self.upstream_validation_rejections
3183 .fetch_add(1, Ordering::Relaxed);
3184 return Err(AedbError::Validation(reason));
3185 }
3186 Ok(())
3187 }
3188
3189 pub async fn order_book_cancel(
3190 &self,
3191 project_id: &str,
3192 scope_id: &str,
3193 instrument: &str,
3194 order_id: u64,
3195 owner: &str,
3196 ) -> Result<CommitResult, AedbError> {
3197 self.commit_prevalidated_internal(
3198 "order_book_cancel",
3199 Mutation::OrderBookCancel {
3200 project_id: project_id.to_string(),
3201 scope_id: scope_id.to_string(),
3202 instrument: instrument.to_string(),
3203 order_id,
3204 client_order_id: None,
3205 owner: owner.to_string(),
3206 },
3207 )
3208 .await
3209 }
3210
3211 pub async fn order_book_cancel_with_finality(
3212 &self,
3213 project_id: &str,
3214 scope_id: &str,
3215 instrument: &str,
3216 order_id: u64,
3217 owner: &str,
3218 finality: CommitFinality,
3219 ) -> Result<CommitResult, AedbError> {
3220 self.commit_prevalidated_internal_with_finality(
3221 "order_book_cancel",
3222 Mutation::OrderBookCancel {
3223 project_id: project_id.to_string(),
3224 scope_id: scope_id.to_string(),
3225 instrument: instrument.to_string(),
3226 order_id,
3227 client_order_id: None,
3228 owner: owner.to_string(),
3229 },
3230 finality,
3231 )
3232 .await
3233 }
3234
3235 pub async fn order_book_cancel_strict(
3236 &self,
3237 project_id: &str,
3238 scope_id: &str,
3239 instrument: &str,
3240 order_id: u64,
3241 owner: &str,
3242 finality: CommitFinality,
3243 ) -> Result<CommitResult, AedbError> {
3244 self.order_book_cancel_strict_as_internal(
3245 None, project_id, scope_id, instrument, order_id, owner, finality,
3246 )
3247 .await
3248 }
3249
3250 pub async fn order_book_cancel_as(
3251 &self,
3252 caller: CallerContext,
3253 project_id: &str,
3254 scope_id: &str,
3255 instrument: &str,
3256 order_id: u64,
3257 owner: &str,
3258 ) -> Result<CommitResult, AedbError> {
3259 self.commit_as(
3260 caller,
3261 Mutation::OrderBookCancel {
3262 project_id: project_id.to_string(),
3263 scope_id: scope_id.to_string(),
3264 instrument: instrument.to_string(),
3265 order_id,
3266 client_order_id: None,
3267 owner: owner.to_string(),
3268 },
3269 )
3270 .await
3271 }
3272
3273 #[allow(clippy::too_many_arguments)]
3274 pub async fn order_book_cancel_as_with_finality(
3275 &self,
3276 caller: CallerContext,
3277 project_id: &str,
3278 scope_id: &str,
3279 instrument: &str,
3280 order_id: u64,
3281 owner: &str,
3282 finality: CommitFinality,
3283 ) -> Result<CommitResult, AedbError> {
3284 let mut result = self
3285 .order_book_cancel_as(caller, project_id, scope_id, instrument, order_id, owner)
3286 .await?;
3287 self.enforce_finality(&mut result, finality).await?;
3288 Ok(result)
3289 }
3290
3291 #[allow(clippy::too_many_arguments)]
3292 pub async fn order_book_cancel_strict_as(
3293 &self,
3294 caller: CallerContext,
3295 project_id: &str,
3296 scope_id: &str,
3297 instrument: &str,
3298 order_id: u64,
3299 owner: &str,
3300 finality: CommitFinality,
3301 ) -> Result<CommitResult, AedbError> {
3302 self.order_book_cancel_strict_as_internal(
3303 Some(caller),
3304 project_id,
3305 scope_id,
3306 instrument,
3307 order_id,
3308 owner,
3309 finality,
3310 )
3311 .await
3312 }
3313
3314 #[allow(clippy::too_many_arguments)]
3315 async fn order_book_cancel_strict_as_internal(
3316 &self,
3317 caller: Option<CallerContext>,
3318 project_id: &str,
3319 scope_id: &str,
3320 instrument: &str,
3321 order_id: u64,
3322 owner: &str,
3323 finality: CommitFinality,
3324 ) -> Result<CommitResult, AedbError> {
3325 let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
3326 let order_key = key_order(instrument, order_id);
3327 let Some(entry) = lease.view.keyspace.kv_get(project_id, scope_id, &order_key) else {
3328 return Err(AedbError::Validation(format!(
3329 "strict cancel target not found: order_id={order_id}"
3330 )));
3331 };
3332 let order: OrderRecord =
3333 rmp_serde::from_slice(&entry.value).map_err(|e| AedbError::Decode(e.to_string()))?;
3334 if order.owner != owner {
3335 return Err(AedbError::PermissionDenied(
3336 "order ownership mismatch".into(),
3337 ));
3338 }
3339 if !matches!(
3340 order.status,
3341 crate::order_book::OrderStatus::Open | crate::order_book::OrderStatus::PartiallyFilled
3342 ) || u256_from_be(order.remaining_qty_be).is_zero()
3343 {
3344 return Err(AedbError::Validation(format!(
3345 "order not cancellable in current status: {:?}",
3346 order.status
3347 )));
3348 }
3349 let envelope = TransactionEnvelope {
3350 caller,
3351 idempotency_key: None,
3352 write_class: WriteClass::Standard,
3353 assertions: Vec::new(),
3354 read_set: ReadSet {
3355 points: vec![ReadSetEntry {
3356 key: ReadKey::KvKey {
3357 project_id: project_id.to_string(),
3358 scope_id: scope_id.to_string(),
3359 key: order_key,
3360 },
3361 version_at_read: entry.version,
3362 }],
3363 ranges: Vec::new(),
3364 },
3365 write_intent: WriteIntent {
3366 mutations: vec![Mutation::OrderBookCancel {
3367 project_id: project_id.to_string(),
3368 scope_id: scope_id.to_string(),
3369 instrument: instrument.to_string(),
3370 order_id,
3371 client_order_id: None,
3372 owner: owner.to_string(),
3373 }],
3374 },
3375 base_seq: lease.view.seq,
3376 };
3377 self.commit_envelope_with_finality(envelope, finality).await
3378 }
3379
3380 pub async fn order_book_cancel_by_client_id(
3381 &self,
3382 project_id: &str,
3383 scope_id: &str,
3384 instrument: &str,
3385 client_order_id: &str,
3386 owner: &str,
3387 ) -> Result<CommitResult, AedbError> {
3388 self.commit_prevalidated_internal(
3389 "order_book_cancel_by_client_id",
3390 Mutation::OrderBookCancel {
3391 project_id: project_id.to_string(),
3392 scope_id: scope_id.to_string(),
3393 instrument: instrument.to_string(),
3394 order_id: 0,
3395 client_order_id: Some(client_order_id.to_string()),
3396 owner: owner.to_string(),
3397 },
3398 )
3399 .await
3400 }
3401
3402 pub async fn order_book_cancel_by_client_id_as(
3403 &self,
3404 caller: CallerContext,
3405 project_id: &str,
3406 scope_id: &str,
3407 instrument: &str,
3408 client_order_id: &str,
3409 owner: &str,
3410 ) -> Result<CommitResult, AedbError> {
3411 self.commit_as(
3412 caller,
3413 Mutation::OrderBookCancel {
3414 project_id: project_id.to_string(),
3415 scope_id: scope_id.to_string(),
3416 instrument: instrument.to_string(),
3417 order_id: 0,
3418 client_order_id: Some(client_order_id.to_string()),
3419 owner: owner.to_string(),
3420 },
3421 )
3422 .await
3423 }
3424
3425 pub async fn order_book_cancel_by_client_id_strict(
3426 &self,
3427 project_id: &str,
3428 scope_id: &str,
3429 instrument: &str,
3430 client_order_id: &str,
3431 owner: &str,
3432 finality: CommitFinality,
3433 ) -> Result<CommitResult, AedbError> {
3434 self.order_book_cancel_by_client_id_strict_as_internal(
3435 None,
3436 project_id,
3437 scope_id,
3438 instrument,
3439 client_order_id,
3440 owner,
3441 finality,
3442 )
3443 .await
3444 }
3445
3446 #[allow(clippy::too_many_arguments)]
3447 pub async fn order_book_cancel_by_client_id_strict_as(
3448 &self,
3449 caller: CallerContext,
3450 project_id: &str,
3451 scope_id: &str,
3452 instrument: &str,
3453 client_order_id: &str,
3454 owner: &str,
3455 finality: CommitFinality,
3456 ) -> Result<CommitResult, AedbError> {
3457 self.order_book_cancel_by_client_id_strict_as_internal(
3458 Some(caller),
3459 project_id,
3460 scope_id,
3461 instrument,
3462 client_order_id,
3463 owner,
3464 finality,
3465 )
3466 .await
3467 }
3468
3469 #[allow(clippy::too_many_arguments)]
3470 async fn order_book_cancel_by_client_id_strict_as_internal(
3471 &self,
3472 caller: Option<CallerContext>,
3473 project_id: &str,
3474 scope_id: &str,
3475 instrument: &str,
3476 client_order_id: &str,
3477 owner: &str,
3478 finality: CommitFinality,
3479 ) -> Result<CommitResult, AedbError> {
3480 let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
3481 let cid_key = key_client_id(instrument, owner, client_order_id);
3482 let Some(cid_entry) = lease.view.keyspace.kv_get(project_id, scope_id, &cid_key) else {
3483 return Err(AedbError::Validation(format!(
3484 "strict cancel target not found: client_order_id={client_order_id}"
3485 )));
3486 };
3487 if cid_entry.value.len() != 8 {
3488 return Err(AedbError::Validation(
3489 "invalid client-order mapping encoding".into(),
3490 ));
3491 }
3492 let mut id_bytes = [0u8; 8];
3493 id_bytes.copy_from_slice(&cid_entry.value);
3494 let order_id = u64::from_be_bytes(id_bytes);
3495 let order_key = key_order(instrument, order_id);
3496 let Some(order_entry) = lease.view.keyspace.kv_get(project_id, scope_id, &order_key) else {
3497 return Err(AedbError::Validation(format!(
3498 "strict cancel target not found: order_id={order_id}"
3499 )));
3500 };
3501 let order: OrderRecord = rmp_serde::from_slice(&order_entry.value)
3502 .map_err(|e| AedbError::Decode(e.to_string()))?;
3503 if order.owner != owner {
3504 return Err(AedbError::PermissionDenied(
3505 "order ownership mismatch".into(),
3506 ));
3507 }
3508 if !matches!(
3509 order.status,
3510 crate::order_book::OrderStatus::Open | crate::order_book::OrderStatus::PartiallyFilled
3511 ) || u256_from_be(order.remaining_qty_be).is_zero()
3512 {
3513 return Err(AedbError::Validation(format!(
3514 "order not cancellable in current status: {:?}",
3515 order.status
3516 )));
3517 }
3518 let envelope = TransactionEnvelope {
3519 caller,
3520 idempotency_key: None,
3521 write_class: WriteClass::Standard,
3522 assertions: Vec::new(),
3523 read_set: ReadSet {
3524 points: vec![
3525 ReadSetEntry {
3526 key: ReadKey::KvKey {
3527 project_id: project_id.to_string(),
3528 scope_id: scope_id.to_string(),
3529 key: cid_key,
3530 },
3531 version_at_read: cid_entry.version,
3532 },
3533 ReadSetEntry {
3534 key: ReadKey::KvKey {
3535 project_id: project_id.to_string(),
3536 scope_id: scope_id.to_string(),
3537 key: order_key,
3538 },
3539 version_at_read: order_entry.version,
3540 },
3541 ],
3542 ranges: Vec::new(),
3543 },
3544 write_intent: WriteIntent {
3545 mutations: vec![Mutation::OrderBookCancel {
3546 project_id: project_id.to_string(),
3547 scope_id: scope_id.to_string(),
3548 instrument: instrument.to_string(),
3549 order_id: 0,
3550 client_order_id: Some(client_order_id.to_string()),
3551 owner: owner.to_string(),
3552 }],
3553 },
3554 base_seq: lease.view.seq,
3555 };
3556 self.commit_envelope_with_finality(envelope, finality).await
3557 }
3558
3559 #[allow(clippy::too_many_arguments)]
3560 pub async fn order_book_cancel_replace_strict(
3561 &self,
3562 project_id: &str,
3563 scope_id: &str,
3564 instrument: &str,
3565 order_id: u64,
3566 owner: &str,
3567 new_price_ticks: Option<i64>,
3568 new_qty_be: Option<[u8; 32]>,
3569 new_time_in_force: Option<TimeInForce>,
3570 new_exec_instructions: Option<ExecInstruction>,
3571 finality: CommitFinality,
3572 ) -> Result<CommitResult, AedbError> {
3573 self.order_book_cancel_replace_strict_as_internal(
3574 None,
3575 project_id,
3576 scope_id,
3577 instrument,
3578 order_id,
3579 owner,
3580 new_price_ticks,
3581 new_qty_be,
3582 new_time_in_force,
3583 new_exec_instructions,
3584 finality,
3585 )
3586 .await
3587 }
3588
3589 #[allow(clippy::too_many_arguments)]
3590 pub async fn order_book_cancel_replace_strict_as(
3591 &self,
3592 caller: CallerContext,
3593 project_id: &str,
3594 scope_id: &str,
3595 instrument: &str,
3596 order_id: u64,
3597 owner: &str,
3598 new_price_ticks: Option<i64>,
3599 new_qty_be: Option<[u8; 32]>,
3600 new_time_in_force: Option<TimeInForce>,
3601 new_exec_instructions: Option<ExecInstruction>,
3602 finality: CommitFinality,
3603 ) -> Result<CommitResult, AedbError> {
3604 self.order_book_cancel_replace_strict_as_internal(
3605 Some(caller),
3606 project_id,
3607 scope_id,
3608 instrument,
3609 order_id,
3610 owner,
3611 new_price_ticks,
3612 new_qty_be,
3613 new_time_in_force,
3614 new_exec_instructions,
3615 finality,
3616 )
3617 .await
3618 }
3619
3620 #[allow(clippy::too_many_arguments)]
3621 async fn order_book_cancel_replace_strict_as_internal(
3622 &self,
3623 caller: Option<CallerContext>,
3624 project_id: &str,
3625 scope_id: &str,
3626 instrument: &str,
3627 order_id: u64,
3628 owner: &str,
3629 new_price_ticks: Option<i64>,
3630 new_qty_be: Option<[u8; 32]>,
3631 new_time_in_force: Option<TimeInForce>,
3632 new_exec_instructions: Option<ExecInstruction>,
3633 finality: CommitFinality,
3634 ) -> Result<CommitResult, AedbError> {
3635 let (order_key, version, base_seq) = self
3636 .order_book_strict_cancellable_version(
3637 project_id, scope_id, instrument, order_id, owner,
3638 )
3639 .await?;
3640 let envelope = TransactionEnvelope {
3641 caller,
3642 idempotency_key: None,
3643 write_class: WriteClass::Standard,
3644 assertions: Vec::new(),
3645 read_set: ReadSet {
3646 points: vec![ReadSetEntry {
3647 key: ReadKey::KvKey {
3648 project_id: project_id.to_string(),
3649 scope_id: scope_id.to_string(),
3650 key: order_key,
3651 },
3652 version_at_read: version,
3653 }],
3654 ranges: Vec::new(),
3655 },
3656 write_intent: WriteIntent {
3657 mutations: vec![Mutation::OrderBookCancelReplace {
3658 project_id: project_id.to_string(),
3659 scope_id: scope_id.to_string(),
3660 instrument: instrument.to_string(),
3661 order_id,
3662 owner: owner.to_string(),
3663 new_price_ticks,
3664 new_qty_be,
3665 new_time_in_force,
3666 new_exec_instructions,
3667 }],
3668 },
3669 base_seq,
3670 };
3671 self.commit_envelope_with_finality(envelope, finality).await
3672 }
3673
3674 #[allow(clippy::too_many_arguments)]
3675 pub async fn order_book_reduce_strict(
3676 &self,
3677 project_id: &str,
3678 scope_id: &str,
3679 instrument: &str,
3680 order_id: u64,
3681 owner: &str,
3682 reduce_by_be: [u8; 32],
3683 finality: CommitFinality,
3684 ) -> Result<CommitResult, AedbError> {
3685 self.order_book_reduce_strict_as_internal(
3686 None,
3687 project_id,
3688 scope_id,
3689 instrument,
3690 order_id,
3691 owner,
3692 reduce_by_be,
3693 finality,
3694 )
3695 .await
3696 }
3697
3698 #[allow(clippy::too_many_arguments)]
3699 pub async fn order_book_reduce_strict_as(
3700 &self,
3701 caller: CallerContext,
3702 project_id: &str,
3703 scope_id: &str,
3704 instrument: &str,
3705 order_id: u64,
3706 owner: &str,
3707 reduce_by_be: [u8; 32],
3708 finality: CommitFinality,
3709 ) -> Result<CommitResult, AedbError> {
3710 self.order_book_reduce_strict_as_internal(
3711 Some(caller),
3712 project_id,
3713 scope_id,
3714 instrument,
3715 order_id,
3716 owner,
3717 reduce_by_be,
3718 finality,
3719 )
3720 .await
3721 }
3722
3723 #[allow(clippy::too_many_arguments)]
3724 async fn order_book_reduce_strict_as_internal(
3725 &self,
3726 caller: Option<CallerContext>,
3727 project_id: &str,
3728 scope_id: &str,
3729 instrument: &str,
3730 order_id: u64,
3731 owner: &str,
3732 reduce_by_be: [u8; 32],
3733 finality: CommitFinality,
3734 ) -> Result<CommitResult, AedbError> {
3735 let reduce_by = u256_from_be(reduce_by_be);
3736 if reduce_by.is_zero() {
3737 return Err(AedbError::Validation(
3738 "strict reduce requires reduce_by > 0".into(),
3739 ));
3740 }
3741 let (order_key, version, base_seq) = self
3742 .order_book_strict_cancellable_version(
3743 project_id, scope_id, instrument, order_id, owner,
3744 )
3745 .await?;
3746 let envelope = TransactionEnvelope {
3747 caller,
3748 idempotency_key: None,
3749 write_class: WriteClass::Standard,
3750 assertions: Vec::new(),
3751 read_set: ReadSet {
3752 points: vec![ReadSetEntry {
3753 key: ReadKey::KvKey {
3754 project_id: project_id.to_string(),
3755 scope_id: scope_id.to_string(),
3756 key: order_key,
3757 },
3758 version_at_read: version,
3759 }],
3760 ranges: Vec::new(),
3761 },
3762 write_intent: WriteIntent {
3763 mutations: vec![Mutation::OrderBookReduce {
3764 project_id: project_id.to_string(),
3765 scope_id: scope_id.to_string(),
3766 instrument: instrument.to_string(),
3767 order_id,
3768 owner: owner.to_string(),
3769 reduce_by_be,
3770 }],
3771 },
3772 base_seq,
3773 };
3774 self.commit_envelope_with_finality(envelope, finality).await
3775 }
3776
3777 async fn order_book_strict_cancellable_version(
3778 &self,
3779 project_id: &str,
3780 scope_id: &str,
3781 instrument: &str,
3782 order_id: u64,
3783 owner: &str,
3784 ) -> Result<(Vec<u8>, u64, u64), AedbError> {
3785 let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
3786 let order_key = key_order(instrument, order_id);
3787 let Some(entry) = lease.view.keyspace.kv_get(project_id, scope_id, &order_key) else {
3788 return Err(AedbError::Validation(format!(
3789 "strict target not found: order_id={order_id}"
3790 )));
3791 };
3792 let order: OrderRecord =
3793 rmp_serde::from_slice(&entry.value).map_err(|e| AedbError::Decode(e.to_string()))?;
3794 if order.owner != owner {
3795 return Err(AedbError::PermissionDenied(
3796 "order ownership mismatch".into(),
3797 ));
3798 }
3799 if !matches!(
3800 order.status,
3801 crate::order_book::OrderStatus::Open | crate::order_book::OrderStatus::PartiallyFilled
3802 ) || u256_from_be(order.remaining_qty_be).is_zero()
3803 {
3804 return Err(AedbError::Validation(format!(
3805 "order not mutable in current status: {:?}",
3806 order.status
3807 )));
3808 }
3809 Ok((order_key, entry.version, lease.view.seq))
3810 }
3811
3812 #[allow(clippy::too_many_arguments)]
3813 pub async fn order_book_cancel_replace(
3814 &self,
3815 project_id: &str,
3816 scope_id: &str,
3817 instrument: &str,
3818 order_id: u64,
3819 owner: &str,
3820 new_price_ticks: Option<i64>,
3821 new_qty_be: Option<[u8; 32]>,
3822 new_time_in_force: Option<TimeInForce>,
3823 new_exec_instructions: Option<ExecInstruction>,
3824 ) -> Result<CommitResult, AedbError> {
3825 self.commit_prevalidated_internal(
3826 "order_book_cancel_replace",
3827 Mutation::OrderBookCancelReplace {
3828 project_id: project_id.to_string(),
3829 scope_id: scope_id.to_string(),
3830 instrument: instrument.to_string(),
3831 order_id,
3832 owner: owner.to_string(),
3833 new_price_ticks,
3834 new_qty_be,
3835 new_time_in_force,
3836 new_exec_instructions,
3837 },
3838 )
3839 .await
3840 }
3841
3842 #[allow(clippy::too_many_arguments)]
3843 pub async fn order_book_cancel_replace_as(
3844 &self,
3845 caller: CallerContext,
3846 project_id: &str,
3847 scope_id: &str,
3848 instrument: &str,
3849 order_id: u64,
3850 owner: &str,
3851 new_price_ticks: Option<i64>,
3852 new_qty_be: Option<[u8; 32]>,
3853 new_time_in_force: Option<TimeInForce>,
3854 new_exec_instructions: Option<ExecInstruction>,
3855 ) -> Result<CommitResult, AedbError> {
3856 self.commit_as(
3857 caller,
3858 Mutation::OrderBookCancelReplace {
3859 project_id: project_id.to_string(),
3860 scope_id: scope_id.to_string(),
3861 instrument: instrument.to_string(),
3862 order_id,
3863 owner: owner.to_string(),
3864 new_price_ticks,
3865 new_qty_be,
3866 new_time_in_force,
3867 new_exec_instructions,
3868 },
3869 )
3870 .await
3871 }
3872
3873 #[allow(clippy::too_many_arguments)]
3874 pub async fn order_book_mass_cancel(
3875 &self,
3876 project_id: &str,
3877 scope_id: &str,
3878 instrument: &str,
3879 owner: &str,
3880 side: Option<OrderSide>,
3881 owner_filter: Option<String>,
3882 price_range_ticks: Option<(i64, i64)>,
3883 ) -> Result<CommitResult, AedbError> {
3884 self.commit_prevalidated_internal(
3885 "order_book_mass_cancel",
3886 Mutation::OrderBookMassCancel {
3887 project_id: project_id.to_string(),
3888 scope_id: scope_id.to_string(),
3889 instrument: instrument.to_string(),
3890 owner: owner.to_string(),
3891 side,
3892 owner_filter,
3893 price_range_ticks,
3894 },
3895 )
3896 .await
3897 }
3898
3899 #[allow(clippy::too_many_arguments)]
3900 pub async fn order_book_mass_cancel_as(
3901 &self,
3902 caller: CallerContext,
3903 project_id: &str,
3904 scope_id: &str,
3905 instrument: &str,
3906 owner: &str,
3907 side: Option<OrderSide>,
3908 owner_filter: Option<String>,
3909 price_range_ticks: Option<(i64, i64)>,
3910 ) -> Result<CommitResult, AedbError> {
3911 self.commit_as(
3912 caller,
3913 Mutation::OrderBookMassCancel {
3914 project_id: project_id.to_string(),
3915 scope_id: scope_id.to_string(),
3916 instrument: instrument.to_string(),
3917 owner: owner.to_string(),
3918 side,
3919 owner_filter,
3920 price_range_ticks,
3921 },
3922 )
3923 .await
3924 }
3925
3926 pub async fn order_book_reduce(
3927 &self,
3928 project_id: &str,
3929 scope_id: &str,
3930 instrument: &str,
3931 order_id: u64,
3932 owner: &str,
3933 reduce_by_be: [u8; 32],
3934 ) -> Result<CommitResult, AedbError> {
3935 self.commit_prevalidated_internal(
3936 "order_book_reduce",
3937 Mutation::OrderBookReduce {
3938 project_id: project_id.to_string(),
3939 scope_id: scope_id.to_string(),
3940 instrument: instrument.to_string(),
3941 order_id,
3942 owner: owner.to_string(),
3943 reduce_by_be,
3944 },
3945 )
3946 .await
3947 }
3948
3949 #[allow(clippy::too_many_arguments)]
3950 pub async fn order_book_reduce_as(
3951 &self,
3952 caller: CallerContext,
3953 project_id: &str,
3954 scope_id: &str,
3955 instrument: &str,
3956 order_id: u64,
3957 owner: &str,
3958 reduce_by_be: [u8; 32],
3959 ) -> Result<CommitResult, AedbError> {
3960 self.commit_as(
3961 caller,
3962 Mutation::OrderBookReduce {
3963 project_id: project_id.to_string(),
3964 scope_id: scope_id.to_string(),
3965 instrument: instrument.to_string(),
3966 order_id,
3967 owner: owner.to_string(),
3968 reduce_by_be,
3969 },
3970 )
3971 .await
3972 }
3973
3974 pub async fn order_book_match_internal(
3975 &self,
3976 project_id: &str,
3977 scope_id: &str,
3978 instrument: &str,
3979 fills: Vec<FillSpec>,
3980 ) -> Result<CommitResult, AedbError> {
3981 self.commit_prevalidated_internal(
3982 "order_book_match_internal",
3983 Mutation::OrderBookMatch {
3984 project_id: project_id.to_string(),
3985 scope_id: scope_id.to_string(),
3986 instrument: instrument.to_string(),
3987 fills,
3988 },
3989 )
3990 .await
3991 }
3992
3993 pub async fn order_book_match_internal_as(
3994 &self,
3995 caller: CallerContext,
3996 project_id: &str,
3997 scope_id: &str,
3998 instrument: &str,
3999 fills: Vec<FillSpec>,
4000 ) -> Result<CommitResult, AedbError> {
4001 self.commit_as(
4002 caller,
4003 Mutation::OrderBookMatch {
4004 project_id: project_id.to_string(),
4005 scope_id: scope_id.to_string(),
4006 instrument: instrument.to_string(),
4007 fills,
4008 },
4009 )
4010 .await
4011 }
4012
4013 pub async fn order_book_top_n(
4014 &self,
4015 project_id: &str,
4016 scope_id: &str,
4017 instrument: &str,
4018 depth: u32,
4019 consistency: ConsistencyMode,
4020 caller: &CallerContext,
4021 ) -> Result<OrderBookDepth, QueryError> {
4022 ensure_query_caller_allowed(caller)?;
4023 let lease = self
4024 .acquire_snapshot(consistency)
4025 .await
4026 .map_err(QueryError::from)?;
4027 let prefix = format!("ob:{instrument}:");
4028 if !lease.view.catalog.has_kv_read_permission(
4029 &caller.caller_id,
4030 project_id,
4031 scope_id,
4032 prefix.as_bytes(),
4033 ) {
4034 return Err(QueryError::PermissionDenied {
4035 permission: format!("KvRead({project_id}.{scope_id})"),
4036 scope: caller.caller_id.clone(),
4037 });
4038 }
4039 read_top_n(
4040 &lease.view.keyspace,
4041 project_id,
4042 scope_id,
4043 instrument,
4044 depth as usize,
4045 lease.view.seq,
4046 )
4047 .map_err(QueryError::from)
4048 }
4049
4050 pub async fn order_status(
4051 &self,
4052 project_id: &str,
4053 scope_id: &str,
4054 instrument: &str,
4055 order_id: u64,
4056 consistency: ConsistencyMode,
4057 caller: &CallerContext,
4058 ) -> Result<Option<OrderRecord>, QueryError> {
4059 ensure_query_caller_allowed(caller)?;
4060 let lease = self
4061 .acquire_snapshot(consistency)
4062 .await
4063 .map_err(QueryError::from)?;
4064 let prefix = format!("ob:{instrument}:");
4065 if !lease.view.catalog.has_kv_read_permission(
4066 &caller.caller_id,
4067 project_id,
4068 scope_id,
4069 prefix.as_bytes(),
4070 ) {
4071 return Err(QueryError::PermissionDenied {
4072 permission: format!("KvRead({project_id}.{scope_id})"),
4073 scope: caller.caller_id.clone(),
4074 });
4075 }
4076 let order = read_order_status(
4077 &lease.view.keyspace,
4078 project_id,
4079 scope_id,
4080 instrument,
4081 order_id,
4082 )
4083 .map_err(QueryError::from)?;
4084 if let Some(order) = &order {
4085 let admin = lease
4086 .view
4087 .catalog
4088 .has_permission(&caller.caller_id, &Permission::GlobalAdmin);
4089 if !admin && order.owner != caller.caller_id {
4090 return Err(QueryError::PermissionDenied {
4091 permission: "order_status(owner match)".into(),
4092 scope: caller.caller_id.clone(),
4093 });
4094 }
4095 }
4096 Ok(order)
4097 }
4098
4099 pub async fn open_orders(
4100 &self,
4101 project_id: &str,
4102 scope_id: &str,
4103 instrument: &str,
4104 owner: &str,
4105 consistency: ConsistencyMode,
4106 caller: &CallerContext,
4107 ) -> Result<Vec<OrderRecord>, QueryError> {
4108 ensure_query_caller_allowed(caller)?;
4109 let lease = self
4110 .acquire_snapshot(consistency)
4111 .await
4112 .map_err(QueryError::from)?;
4113 let prefix = format!("ob:{instrument}:");
4114 if !lease.view.catalog.has_kv_read_permission(
4115 &caller.caller_id,
4116 project_id,
4117 scope_id,
4118 prefix.as_bytes(),
4119 ) {
4120 return Err(QueryError::PermissionDenied {
4121 permission: format!("KvRead({project_id}.{scope_id})"),
4122 scope: caller.caller_id.clone(),
4123 });
4124 }
4125 let admin = lease
4126 .view
4127 .catalog
4128 .has_permission(&caller.caller_id, &Permission::GlobalAdmin);
4129 if !admin && owner != caller.caller_id {
4130 return Err(QueryError::PermissionDenied {
4131 permission: "open_orders(owner match)".into(),
4132 scope: caller.caller_id.clone(),
4133 });
4134 }
4135 read_open_orders(
4136 &lease.view.keyspace,
4137 project_id,
4138 scope_id,
4139 instrument,
4140 owner,
4141 )
4142 .map_err(QueryError::from)
4143 }
4144
4145 pub async fn recent_trades(
4146 &self,
4147 project_id: &str,
4148 scope_id: &str,
4149 instrument: &str,
4150 limit: u32,
4151 consistency: ConsistencyMode,
4152 caller: &CallerContext,
4153 ) -> Result<Vec<crate::order_book::FillRecord>, QueryError> {
4154 ensure_query_caller_allowed(caller)?;
4155 let lease = self
4156 .acquire_snapshot(consistency)
4157 .await
4158 .map_err(QueryError::from)?;
4159 let prefix = format!("ob:{instrument}:");
4160 if !lease.view.catalog.has_kv_read_permission(
4161 &caller.caller_id,
4162 project_id,
4163 scope_id,
4164 prefix.as_bytes(),
4165 ) {
4166 return Err(QueryError::PermissionDenied {
4167 permission: format!("KvRead({project_id}.{scope_id})"),
4168 scope: caller.caller_id.clone(),
4169 });
4170 }
4171 read_recent_trades(
4172 &lease.view.keyspace,
4173 project_id,
4174 scope_id,
4175 instrument,
4176 limit as usize,
4177 )
4178 .map_err(QueryError::from)
4179 }
4180
4181 pub async fn spread(
4182 &self,
4183 project_id: &str,
4184 scope_id: &str,
4185 instrument: &str,
4186 consistency: ConsistencyMode,
4187 caller: &CallerContext,
4188 ) -> Result<Spread, QueryError> {
4189 ensure_query_caller_allowed(caller)?;
4190 let lease = self
4191 .acquire_snapshot(consistency)
4192 .await
4193 .map_err(QueryError::from)?;
4194 let prefix = format!("ob:{instrument}:");
4195 if !lease.view.catalog.has_kv_read_permission(
4196 &caller.caller_id,
4197 project_id,
4198 scope_id,
4199 prefix.as_bytes(),
4200 ) {
4201 return Err(QueryError::PermissionDenied {
4202 permission: format!("KvRead({project_id}.{scope_id})"),
4203 scope: caller.caller_id.clone(),
4204 });
4205 }
4206 read_spread(
4207 &lease.view.keyspace,
4208 project_id,
4209 scope_id,
4210 instrument,
4211 lease.view.seq,
4212 )
4213 .map_err(QueryError::from)
4214 }
4215
4216 pub async fn order_book_last_execution_report(
4217 &self,
4218 project_id: &str,
4219 scope_id: &str,
4220 instrument: &str,
4221 consistency: ConsistencyMode,
4222 caller: &CallerContext,
4223 ) -> Result<Option<crate::order_book::ExecutionReport>, QueryError> {
4224 ensure_query_caller_allowed(caller)?;
4225 let lease = self
4226 .acquire_snapshot(consistency)
4227 .await
4228 .map_err(QueryError::from)?;
4229 let prefix = format!("ob:{instrument}:");
4230 if !lease.view.catalog.has_kv_read_permission(
4231 &caller.caller_id,
4232 project_id,
4233 scope_id,
4234 prefix.as_bytes(),
4235 ) {
4236 return Err(QueryError::PermissionDenied {
4237 permission: format!("KvRead({project_id}.{scope_id})"),
4238 scope: caller.caller_id.clone(),
4239 });
4240 }
4241 read_last_execution_report(&lease.view.keyspace, project_id, scope_id, instrument)
4242 .map_err(QueryError::from)
4243 }
4244
4245 pub async fn compare_and_swap(
4246 &self,
4247 project_id: &str,
4248 scope_id: &str,
4249 table_name: &str,
4250 primary_key: Vec<Value>,
4251 row: Row,
4252 expected_seq: u64,
4253 ) -> Result<CommitResult, AedbError> {
4254 self.commit_envelope(TransactionEnvelope {
4255 caller: None,
4256 idempotency_key: None,
4257 write_class: crate::commit::tx::WriteClass::Standard,
4258 assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4259 project_id: project_id.to_string(),
4260 scope_id: scope_id.to_string(),
4261 table_name: table_name.to_string(),
4262 primary_key: primary_key.clone(),
4263 expected_seq,
4264 }],
4265 read_set: crate::commit::tx::ReadSet::default(),
4266 write_intent: crate::commit::tx::WriteIntent {
4267 mutations: vec![Mutation::Upsert {
4268 project_id: project_id.to_string(),
4269 scope_id: scope_id.to_string(),
4270 table_name: table_name.to_string(),
4271 primary_key,
4272 row,
4273 }],
4274 },
4275 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4276 })
4277 .await
4278 }
4279
4280 pub async fn compare_and_swap_as_with(
4281 &self,
4282 request: CompareAndSwapRequest,
4283 ) -> Result<CommitResult, AedbError> {
4284 self.commit_envelope(TransactionEnvelope {
4285 caller: Some(request.caller),
4286 idempotency_key: None,
4287 write_class: crate::commit::tx::WriteClass::Standard,
4288 assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4289 project_id: request.project_id.clone(),
4290 scope_id: request.scope_id.clone(),
4291 table_name: request.table_name.clone(),
4292 primary_key: request.primary_key.clone(),
4293 expected_seq: request.expected_seq,
4294 }],
4295 read_set: crate::commit::tx::ReadSet::default(),
4296 write_intent: crate::commit::tx::WriteIntent {
4297 mutations: vec![Mutation::Upsert {
4298 project_id: request.project_id,
4299 scope_id: request.scope_id,
4300 table_name: request.table_name,
4301 primary_key: request.primary_key,
4302 row: request.row,
4303 }],
4304 },
4305 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4306 })
4307 .await
4308 }
4309
4310 #[allow(clippy::too_many_arguments)]
4311 pub async fn compare_and_swap_as(
4312 &self,
4313 caller: CallerContext,
4314 project_id: &str,
4315 scope_id: &str,
4316 table_name: &str,
4317 primary_key: Vec<Value>,
4318 row: Row,
4319 expected_seq: u64,
4320 ) -> Result<CommitResult, AedbError> {
4321 self.compare_and_swap_as_with(CompareAndSwapRequest {
4322 caller,
4323 project_id: project_id.to_string(),
4324 scope_id: scope_id.to_string(),
4325 table_name: table_name.to_string(),
4326 primary_key,
4327 row,
4328 expected_seq,
4329 })
4330 .await
4331 }
4332
4333 #[allow(clippy::too_many_arguments)]
4334 pub async fn compare_and_inc_u256(
4335 &self,
4336 project_id: &str,
4337 scope_id: &str,
4338 table_name: &str,
4339 primary_key: Vec<Value>,
4340 column: &str,
4341 amount_be: [u8; 32],
4342 expected_seq: u64,
4343 ) -> Result<CommitResult, AedbError> {
4344 self.commit_envelope(TransactionEnvelope {
4345 caller: None,
4346 idempotency_key: None,
4347 write_class: crate::commit::tx::WriteClass::Standard,
4348 assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4349 project_id: project_id.to_string(),
4350 scope_id: scope_id.to_string(),
4351 table_name: table_name.to_string(),
4352 primary_key: primary_key.clone(),
4353 expected_seq,
4354 }],
4355 read_set: crate::commit::tx::ReadSet::default(),
4356 write_intent: crate::commit::tx::WriteIntent {
4357 mutations: vec![Mutation::TableIncU256 {
4358 project_id: project_id.to_string(),
4359 scope_id: scope_id.to_string(),
4360 table_name: table_name.to_string(),
4361 primary_key,
4362 column: column.to_string(),
4363 amount_be,
4364 }],
4365 },
4366 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4367 })
4368 .await
4369 }
4370
4371 #[allow(clippy::too_many_arguments)]
4372 pub async fn compare_and_inc_u256_as(
4373 &self,
4374 caller: CallerContext,
4375 project_id: &str,
4376 scope_id: &str,
4377 table_name: &str,
4378 primary_key: Vec<Value>,
4379 column: &str,
4380 amount_be: [u8; 32],
4381 expected_seq: u64,
4382 ) -> Result<CommitResult, AedbError> {
4383 self.commit_envelope(TransactionEnvelope {
4384 caller: Some(caller),
4385 idempotency_key: None,
4386 write_class: crate::commit::tx::WriteClass::Standard,
4387 assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4388 project_id: project_id.to_string(),
4389 scope_id: scope_id.to_string(),
4390 table_name: table_name.to_string(),
4391 primary_key: primary_key.clone(),
4392 expected_seq,
4393 }],
4394 read_set: crate::commit::tx::ReadSet::default(),
4395 write_intent: crate::commit::tx::WriteIntent {
4396 mutations: vec![Mutation::TableIncU256 {
4397 project_id: project_id.to_string(),
4398 scope_id: scope_id.to_string(),
4399 table_name: table_name.to_string(),
4400 primary_key,
4401 column: column.to_string(),
4402 amount_be,
4403 }],
4404 },
4405 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4406 })
4407 .await
4408 }
4409
4410 #[allow(clippy::too_many_arguments)]
4411 pub async fn compare_and_dec_u256(
4412 &self,
4413 project_id: &str,
4414 scope_id: &str,
4415 table_name: &str,
4416 primary_key: Vec<Value>,
4417 column: &str,
4418 amount_be: [u8; 32],
4419 expected_seq: u64,
4420 ) -> Result<CommitResult, AedbError> {
4421 self.commit_envelope(TransactionEnvelope {
4422 caller: None,
4423 idempotency_key: None,
4424 write_class: crate::commit::tx::WriteClass::Standard,
4425 assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4426 project_id: project_id.to_string(),
4427 scope_id: scope_id.to_string(),
4428 table_name: table_name.to_string(),
4429 primary_key: primary_key.clone(),
4430 expected_seq,
4431 }],
4432 read_set: crate::commit::tx::ReadSet::default(),
4433 write_intent: crate::commit::tx::WriteIntent {
4434 mutations: vec![Mutation::TableDecU256 {
4435 project_id: project_id.to_string(),
4436 scope_id: scope_id.to_string(),
4437 table_name: table_name.to_string(),
4438 primary_key,
4439 column: column.to_string(),
4440 amount_be,
4441 }],
4442 },
4443 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4444 })
4445 .await
4446 }
4447
4448 #[allow(clippy::too_many_arguments)]
4449 pub async fn compare_and_dec_u256_as(
4450 &self,
4451 caller: CallerContext,
4452 project_id: &str,
4453 scope_id: &str,
4454 table_name: &str,
4455 primary_key: Vec<Value>,
4456 column: &str,
4457 amount_be: [u8; 32],
4458 expected_seq: u64,
4459 ) -> Result<CommitResult, AedbError> {
4460 self.commit_envelope(TransactionEnvelope {
4461 caller: Some(caller),
4462 idempotency_key: None,
4463 write_class: crate::commit::tx::WriteClass::Standard,
4464 assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4465 project_id: project_id.to_string(),
4466 scope_id: scope_id.to_string(),
4467 table_name: table_name.to_string(),
4468 primary_key: primary_key.clone(),
4469 expected_seq,
4470 }],
4471 read_set: crate::commit::tx::ReadSet::default(),
4472 write_intent: crate::commit::tx::WriteIntent {
4473 mutations: vec![Mutation::TableDecU256 {
4474 project_id: project_id.to_string(),
4475 scope_id: scope_id.to_string(),
4476 table_name: table_name.to_string(),
4477 primary_key,
4478 column: column.to_string(),
4479 amount_be,
4480 }],
4481 },
4482 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4483 })
4484 .await
4485 }
4486
4487 async fn mutate_where_returning_inner(
4488 &self,
4489 caller: Option<CallerContext>,
4490 project_id: &str,
4491 scope_id: &str,
4492 table_name: &str,
4493 predicate: Expr,
4494 updates: Vec<(String, TableUpdateExpr)>,
4495 ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
4496 for _ in 0..MAX_MUTATE_WHERE_RETURNING_RETRIES {
4497 let (_, catalog_snapshot, _) = self.executor.snapshot_state().await;
4498 let schema = catalog_snapshot
4499 .tables
4500 .get(&(namespace_key(project_id, scope_id), table_name.to_string()))
4501 .cloned()
4502 .ok_or_else(|| AedbError::NotFound {
4503 resource_type: ErrorResourceType::Table,
4504 resource_id: format!("{project_id}.{scope_id}.{table_name}"),
4505 })?;
4506
4507 let mut query = Query::select(&["*"])
4508 .from(table_name)
4509 .where_(predicate.clone())
4510 .limit(1);
4511 for pk in &schema.primary_key {
4512 query = query.order_by(pk, Order::Asc);
4513 }
4514 let query_result = self
4515 .query_with_options_as(
4516 caller.as_ref(),
4517 project_id,
4518 scope_id,
4519 query,
4520 QueryOptions {
4521 consistency: ConsistencyMode::AtLatest,
4522 allow_full_scan: true,
4523 ..QueryOptions::default()
4524 },
4525 )
4526 .await
4527 .map_err(query_error_to_aedb)?;
4528
4529 let Some(before) = query_result.rows.first().cloned() else {
4530 return Ok(None);
4531 };
4532 let primary_key = extract_primary_key_values(&schema, &before)?;
4533 let snapshot_seq = query_result.snapshot_seq;
4534 let lease = self
4535 .acquire_snapshot(ConsistencyMode::AtSeq(snapshot_seq))
4536 .await?;
4537 let expected_seq = lease
4538 .view
4539 .keyspace
4540 .table(project_id, scope_id, table_name)
4541 .and_then(|table| {
4542 table
4543 .row_versions
4544 .get(&crate::storage::encoded_key::EncodedKey::from_values(
4545 &primary_key,
4546 ))
4547 .copied()
4548 })
4549 .unwrap_or(0);
4550 if expected_seq == 0 {
4551 continue;
4552 }
4553 let after = apply_table_update_exprs(&schema, &before, &updates)?;
4554
4555 let commit = self
4556 .commit_envelope(TransactionEnvelope {
4557 caller: caller.clone(),
4558 idempotency_key: None,
4559 write_class: WriteClass::Standard,
4560 assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4561 project_id: project_id.to_string(),
4562 scope_id: scope_id.to_string(),
4563 table_name: table_name.to_string(),
4564 primary_key: primary_key.clone(),
4565 expected_seq,
4566 }],
4567 read_set: ReadSet::default(),
4568 write_intent: WriteIntent {
4569 mutations: vec![Mutation::Upsert {
4570 project_id: project_id.to_string(),
4571 scope_id: scope_id.to_string(),
4572 table_name: table_name.to_string(),
4573 primary_key: primary_key.clone(),
4574 row: after.clone(),
4575 }],
4576 },
4577 base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4578 })
4579 .await;
4580
4581 match commit {
4582 Ok(commit) => {
4583 return Ok(Some(MutateWhereReturningResult {
4584 commit,
4585 primary_key,
4586 before,
4587 after,
4588 }));
4589 }
4590 Err(AedbError::AssertionFailed { .. }) => continue,
4591 Err(err) => return Err(err),
4592 }
4593 }
4594 Err(AedbError::Conflict(
4595 "mutate_where_returning exceeded retry budget".into(),
4596 ))
4597 }
4598
4599 async fn snapshot_for_consistency(
4600 &self,
4601 consistency: ConsistencyMode,
4602 ) -> Result<SnapshotReadView, AedbError> {
4603 match consistency {
4604 ConsistencyMode::AtLatest => {
4605 let (keyspace, catalog, seq) = self.executor.snapshot_state().await;
4606 Ok(SnapshotReadView {
4607 keyspace: Arc::new(keyspace),
4608 catalog: Arc::new(catalog),
4609 seq,
4610 })
4611 }
4612 ConsistencyMode::AtSeq(requested) => {
4613 match self.executor.snapshot_at_seq(requested).await {
4614 Ok(view) => Ok(view),
4615 Err(e) => {
4616 if let Some(view) = self.recovery_cache.lock().get(requested) {
4617 return Ok(view);
4618 }
4619 if !should_fallback_to_recovery(&e) {
4620 return Err(e);
4621 }
4622 warn!(
4623 seq = requested,
4624 "AtSeq falling back to disk recovery; version not in ring buffer"
4625 );
4626 let recovered =
4627 recover_at_seq_with_config(&self.dir, requested, &self._config)?;
4628 let view = SnapshotReadView {
4629 keyspace: Arc::new(recovered.keyspace.snapshot()),
4630 catalog: Arc::new(recovered.catalog.snapshot()),
4631 seq: recovered.current_seq,
4632 };
4633 self.recovery_cache.lock().put(requested, view.clone());
4634 Ok(view)
4635 }
4636 }
4637 }
4638 ConsistencyMode::AtCheckpoint => {
4639 let manifest = crate::manifest::atomic::load_manifest_signed(
4640 &self.dir,
4641 self._config.hmac_key(),
4642 )?;
4643 let Some(cp) = manifest.checkpoints.last() else {
4644 return Err(AedbError::Unavailable {
4645 message: "no checkpoint available".into(),
4646 });
4647 };
4648 match self.executor.snapshot_at_seq(cp.seq).await {
4649 Ok(view) => Ok(view),
4650 Err(e) => {
4651 if let Some(view) = self.recovery_cache.lock().get(cp.seq) {
4652 return Ok(view);
4653 }
4654 if !should_fallback_to_recovery(&e) {
4655 return Err(e);
4656 }
4657 let recovered =
4658 recover_at_seq_with_config(&self.dir, cp.seq, &self._config)?;
4659 let view = SnapshotReadView {
4660 keyspace: Arc::new(recovered.keyspace.snapshot()),
4661 catalog: Arc::new(recovered.catalog.snapshot()),
4662 seq: recovered.current_seq,
4663 };
4664 self.recovery_cache.lock().put(cp.seq, view.clone());
4665 Ok(view)
4666 }
4667 }
4668 }
4669 }
4670 }
4671
4672 async fn acquire_snapshot(
4673 &self,
4674 consistency: ConsistencyMode,
4675 ) -> Result<SnapshotLease, AedbError> {
4676 let view = self.snapshot_for_consistency(consistency).await?;
4677 let mut mgr = self.snapshot_manager.lock();
4678 let handle = mgr.acquire_bounded(view.clone(), self._config.max_concurrent_snapshots)?;
4679 let checked = mgr
4680 .get_checked(handle, self._config.max_snapshot_age_ms)?
4681 .clone();
4682 Ok(SnapshotLease {
4683 manager: Arc::clone(&self.snapshot_manager),
4684 handle,
4685 view: checked,
4686 })
4687 }
4688
4689 pub async fn preflight(&self, mutation: Mutation) -> PreflightResult {
4690 let (snapshot, catalog, _) = self.executor.snapshot_state().await;
4691 preflight(&snapshot, &catalog, &mutation)
4692 }
4693
4694 pub async fn preflight_plan(&self, mutation: Mutation) -> crate::commit::tx::PreflightPlan {
4695 let (snapshot, catalog, base_seq) = self.executor.snapshot_state().await;
4696 preflight_plan(&snapshot, &catalog, &mutation, base_seq)
4697 }
4698
4699 pub async fn preflight_as(
4700 &self,
4701 caller: &CallerContext,
4702 mutation: Mutation,
4703 ) -> Result<PreflightResult, AedbError> {
4704 ensure_external_caller_allowed(caller)?;
4705 let (snapshot, catalog, _) = self.executor.snapshot_state().await;
4706 validate_permissions(&catalog, Some(caller), &mutation)?;
4707 Ok(preflight(&snapshot, &catalog, &mutation))
4708 }
4709
4710 pub async fn preflight_plan_as(
4711 &self,
4712 caller: &CallerContext,
4713 mutation: Mutation,
4714 ) -> Result<crate::commit::tx::PreflightPlan, AedbError> {
4715 ensure_external_caller_allowed(caller)?;
4716 let (snapshot, catalog, base_seq) = self.executor.snapshot_state().await;
4717 validate_permissions(&catalog, Some(caller), &mutation)?;
4718 Ok(preflight_plan(&snapshot, &catalog, &mutation, base_seq))
4719 }
4720
4721 pub async fn commit_ddl(&self, op: DdlOperation) -> Result<DdlResult, AedbError> {
4722 let (_, catalog, _) = self.executor.snapshot_state().await;
4723 let applied = ddl_would_apply(&catalog, &op);
4724 let result = self.commit(Mutation::Ddl(op)).await?;
4725 Ok(DdlResult {
4726 applied,
4727 seq: result.commit_seq,
4728 })
4729 }
4730
4731 pub async fn commit_ddl_batch(
4732 &self,
4733 ddl_ops: Vec<DdlOperation>,
4734 ) -> Result<DdlBatchResult, AedbError> {
4735 self.commit_ddl_batch_with_mode(ddl_ops, DdlBatchOrderMode::AsProvided)
4736 .await
4737 }
4738
4739 pub async fn commit_ddl_batch_dependency_aware(
4740 &self,
4741 ddl_ops: Vec<DdlOperation>,
4742 ) -> Result<DdlBatchResult, AedbError> {
4743 self.commit_ddl_batch_with_mode(ddl_ops, DdlBatchOrderMode::DependencyAware)
4744 .await
4745 }
4746
4747 pub async fn commit_ddl_batch_with_mode(
4748 &self,
4749 ddl_ops: Vec<DdlOperation>,
4750 mode: DdlBatchOrderMode,
4751 ) -> Result<DdlBatchResult, AedbError> {
4752 if ddl_ops.is_empty() {
4753 return Err(AedbError::InvalidConfig {
4754 message: "ddl batch cannot be empty".into(),
4755 });
4756 }
4757 let ddl_ops = match mode {
4758 DdlBatchOrderMode::AsProvided => ddl_ops,
4759 DdlBatchOrderMode::DependencyAware => order_ddl_ops_for_batch(ddl_ops)?,
4760 };
4761 let (_, catalog, _) = self.executor.snapshot_state().await;
4762 let mut planned_catalog = catalog;
4763 let mut results = Vec::with_capacity(ddl_ops.len());
4764 for op in &ddl_ops {
4765 let applied = ddl_would_apply(&planned_catalog, op);
4766 planned_catalog.apply_ddl(op.clone())?;
4767 results.push(DdlResult { applied, seq: 0 });
4768 }
4769 let mutations = ddl_ops.into_iter().map(Mutation::Ddl).collect();
4770 let base_seq = self.executor.current_seq().await;
4771 let committed = self
4772 .commit_envelope(TransactionEnvelope {
4773 caller: None,
4774 idempotency_key: None,
4775 write_class: crate::commit::tx::WriteClass::Standard,
4776 assertions: Vec::new(),
4777 read_set: crate::commit::tx::ReadSet::default(),
4778 write_intent: crate::commit::tx::WriteIntent { mutations },
4779 base_seq,
4780 })
4781 .await?;
4782 for result in &mut results {
4783 result.seq = committed.commit_seq;
4784 }
4785 Ok(DdlBatchResult {
4786 seq: committed.commit_seq,
4787 results,
4788 })
4789 }
4790
4791 pub fn add_lifecycle_hook(&self, hook: Arc<dyn LifecycleHook>) {
4792 self.lifecycle_hooks.lock().push(hook);
4793 }
4794
4795 pub fn remove_lifecycle_hook(&self, hook: &Arc<dyn LifecycleHook>) {
4796 let mut hooks = self.lifecycle_hooks.lock();
4797 hooks.retain(|existing| !Arc::ptr_eq(existing, hook));
4798 }
4799
4800 pub fn add_telemetry_hook(&self, hook: Arc<dyn QueryCommitTelemetryHook>) {
4801 self.telemetry_hooks.lock().push(hook);
4802 }
4803
4804 pub fn remove_telemetry_hook(&self, hook: &Arc<dyn QueryCommitTelemetryHook>) {
4805 let mut hooks = self.telemetry_hooks.lock();
4806 hooks.retain(|existing| !Arc::ptr_eq(existing, hook));
4807 }
4808
4809 pub async fn backup_full_to_remote(
4810 &self,
4811 adapter: &dyn RemoteBackupAdapter,
4812 uri: &str,
4813 ) -> Result<BackupManifest, AedbError> {
4814 let temp = tempfile::tempdir()?;
4815 let manifest = self.backup_full(temp.path()).await?;
4816 adapter.store_backup_dir(uri, temp.path())?;
4817 Ok(manifest)
4818 }
4819
4820 pub fn restore_from_remote(
4821 adapter: &dyn RemoteBackupAdapter,
4822 uri: &str,
4823 data_dir: &Path,
4824 config: &AedbConfig,
4825 ) -> Result<u64, AedbError> {
4826 let temp = tempfile::tempdir()?;
4827 let chain = adapter.materialize_backup_chain(uri, temp.path())?;
4828 Self::restore_from_backup_chain(&chain, data_dir, config, None)
4829 }
4830
4831 pub async fn create_project(&self, project_id: &str) -> Result<(), AedbError> {
4832 self.commit_ddl(DdlOperation::CreateProject {
4833 owner_id: None,
4834 project_id: project_id.to_string(),
4835 if_not_exists: true,
4836 })
4837 .await?;
4838 Ok(())
4839 }
4840
4841 pub async fn drop_project(&self, project_id: &str) -> Result<(), AedbError> {
4842 self.commit_ddl(DdlOperation::DropProject {
4843 project_id: project_id.to_string(),
4844 if_exists: true,
4845 })
4846 .await?;
4847 Ok(())
4848 }
4849
4850 pub async fn create_scope(&self, project_id: &str, scope_id: &str) -> Result<(), AedbError> {
4851 self.commit_ddl(DdlOperation::CreateScope {
4852 owner_id: None,
4853 project_id: project_id.to_string(),
4854 scope_id: scope_id.to_string(),
4855 if_not_exists: true,
4856 })
4857 .await?;
4858 Ok(())
4859 }
4860
4861 pub async fn drop_scope(&self, project_id: &str, scope_id: &str) -> Result<(), AedbError> {
4862 self.commit_ddl(DdlOperation::DropScope {
4863 project_id: project_id.to_string(),
4864 scope_id: scope_id.to_string(),
4865 if_exists: true,
4866 })
4867 .await?;
4868 Ok(())
4869 }
4870
4871 pub async fn enable_kv_projection(
4872 &self,
4873 project_id: &str,
4874 scope_id: &str,
4875 ) -> Result<(), AedbError> {
4876 self.commit(Mutation::Ddl(DdlOperation::EnableKvProjection {
4877 project_id: project_id.to_string(),
4878 scope_id: scope_id.to_string(),
4879 }))
4880 .await?;
4881 Ok(())
4882 }
4883
4884 pub async fn disable_kv_projection(
4885 &self,
4886 project_id: &str,
4887 scope_id: &str,
4888 ) -> Result<(), AedbError> {
4889 self.commit(Mutation::Ddl(DdlOperation::DisableKvProjection {
4890 project_id: project_id.to_string(),
4891 scope_id: scope_id.to_string(),
4892 }))
4893 .await?;
4894 Ok(())
4895 }
4896
4897 pub async fn list_scopes(&self, project_id: &str) -> Result<Vec<String>, AedbError> {
4898 let (_, catalog, _) = self.executor.snapshot_state().await;
4899 Ok(catalog.list_scopes(project_id))
4900 }
4901
4902 pub async fn project_exists(&self, project_id: &str) -> Result<bool, AedbError> {
4903 let (_, catalog, _) = self.executor.snapshot_state().await;
4904 Ok(catalog.projects.contains_key(project_id))
4905 }
4906
4907 pub async fn scope_exists(&self, project_id: &str, scope_id: &str) -> Result<bool, AedbError> {
4908 let (_, catalog, _) = self.executor.snapshot_state().await;
4909 Ok(catalog
4910 .scopes
4911 .contains_key(&(project_id.to_string(), scope_id.to_string())))
4912 }
4913
4914 pub async fn table_exists(
4915 &self,
4916 project_id: &str,
4917 scope_id: &str,
4918 table_name: &str,
4919 ) -> Result<bool, AedbError> {
4920 let (_, catalog, _) = self.executor.snapshot_state().await;
4921 Ok(catalog
4922 .tables
4923 .contains_key(&(namespace_key(project_id, scope_id), table_name.to_string())))
4924 }
4925
4926 pub async fn index_exists(
4927 &self,
4928 project_id: &str,
4929 scope_id: &str,
4930 table_name: &str,
4931 index_name: &str,
4932 ) -> Result<bool, AedbError> {
4933 let (_, catalog, _) = self.executor.snapshot_state().await;
4934 Ok(catalog.indexes.contains_key(&(
4935 namespace_key(project_id, scope_id),
4936 table_name.to_string(),
4937 index_name.to_string(),
4938 )))
4939 }
4940
4941 pub async fn list_projects(&self) -> Result<Vec<ProjectInfo>, AedbError> {
4942 let (_, catalog, _) = self.executor.snapshot_state().await;
4943 let mut infos: Vec<ProjectInfo> = catalog
4944 .projects
4945 .values()
4946 .map(|project| {
4947 let scope_count = catalog
4948 .scopes
4949 .values()
4950 .filter(|scope| scope.project_id == project.project_id)
4951 .count() as u32;
4952 ProjectInfo {
4953 project_id: project.project_id.clone(),
4954 scope_count,
4955 created_at_micros: project.created_at_micros,
4956 }
4957 })
4958 .collect();
4959 infos.sort_by(|a, b| a.project_id.cmp(&b.project_id));
4960 Ok(infos)
4961 }
4962
4963 pub async fn list_scopes_info(&self, project_id: &str) -> Result<Vec<ScopeInfo>, AedbError> {
4964 let (snapshot, catalog, _) = self.executor.snapshot_state().await;
4965 let mut infos: Vec<ScopeInfo> = catalog
4966 .scopes
4967 .values()
4968 .filter(|scope| scope.project_id == project_id)
4969 .map(|scope| {
4970 let table_count = catalog
4971 .tables
4972 .values()
4973 .filter(|table| {
4974 table.project_id == scope.project_id && table.scope_id == scope.scope_id
4975 })
4976 .count() as u32;
4977 let kv_key_count = snapshot
4978 .namespaces
4979 .get(&NamespaceId::project_scope(project_id, &scope.scope_id))
4980 .map_or(0, |ns| ns.kv.entries.len() as u64);
4981 ScopeInfo {
4982 scope_id: scope.scope_id.clone(),
4983 table_count,
4984 kv_key_count,
4985 created_at_micros: scope.created_at_micros,
4986 }
4987 })
4988 .collect();
4989 infos.sort_by(|a, b| a.scope_id.cmp(&b.scope_id));
4990 Ok(infos)
4991 }
4992
4993 pub async fn list_tables_info(
4994 &self,
4995 project_id: &str,
4996 scope_id: &str,
4997 ) -> Result<Vec<TableInfo>, AedbError> {
4998 let (snapshot, catalog, _) = self.executor.snapshot_state().await;
4999 let ns_key = namespace_key(project_id, scope_id);
5000 let ns_id = NamespaceId::project_scope(project_id, scope_id);
5001 let mut infos: Vec<TableInfo> = catalog
5002 .tables
5003 .values()
5004 .filter(|table| table.project_id == project_id && table.scope_id == scope_id)
5005 .map(|table| {
5006 let index_count = catalog
5007 .indexes
5008 .keys()
5009 .filter(|(ns, name, _)| ns == &ns_key && name == &table.table_name)
5010 .count() as u32;
5011 let row_count = snapshot
5012 .namespaces
5013 .get(&ns_id)
5014 .and_then(|ns| ns.tables.get(&table.table_name))
5015 .map_or(0, |t| t.rows.len() as u64);
5016 TableInfo {
5017 table_name: table.table_name.clone(),
5018 column_count: table.columns.len() as u32,
5019 index_count,
5020 row_count,
5021 }
5022 })
5023 .collect();
5024 infos.sort_by(|a, b| a.table_name.cmp(&b.table_name));
5025 Ok(infos)
5026 }
5027
5028 pub async fn set_read_policy(
5029 &self,
5030 project_id: &str,
5031 scope_id: &str,
5032 table_name: &str,
5033 predicate: Expr,
5034 ) -> Result<(), AedbError> {
5035 self.commit(Mutation::Ddl(DdlOperation::SetReadPolicy {
5036 project_id: project_id.to_string(),
5037 scope_id: scope_id.to_string(),
5038 table_name: table_name.to_string(),
5039 predicate,
5040 actor_id: None,
5041 }))
5042 .await?;
5043 Ok(())
5044 }
5045
5046 pub async fn clear_read_policy(
5047 &self,
5048 project_id: &str,
5049 scope_id: &str,
5050 table_name: &str,
5051 ) -> Result<(), AedbError> {
5052 self.commit(Mutation::Ddl(DdlOperation::ClearReadPolicy {
5053 project_id: project_id.to_string(),
5054 scope_id: scope_id.to_string(),
5055 table_name: table_name.to_string(),
5056 actor_id: None,
5057 }))
5058 .await?;
5059 Ok(())
5060 }
5061
5062 pub async fn transfer_ownership(
5063 &self,
5064 resource_type: ResourceType,
5065 project_id: &str,
5066 scope_id: Option<&str>,
5067 table_name: Option<&str>,
5068 new_owner_id: &str,
5069 ) -> Result<CommitResult, AedbError> {
5070 self.commit(Mutation::Ddl(DdlOperation::TransferOwnership {
5071 resource_type,
5072 project_id: project_id.to_string(),
5073 scope_id: scope_id.map(ToString::to_string),
5074 table_name: table_name.map(ToString::to_string),
5075 new_owner_id: new_owner_id.to_string(),
5076 actor_id: None,
5077 }))
5078 .await
5079 }
5080
5081 pub async fn list_tables(
5082 &self,
5083 project_id: &str,
5084 scope_id: &str,
5085 ) -> Result<Vec<TableSchema>, AedbError> {
5086 let (_, catalog, _) = self.executor.snapshot_state().await;
5087 Ok(catalog
5088 .tables
5089 .values()
5090 .filter(|t| t.project_id == project_id && t.scope_id == scope_id)
5091 .cloned()
5092 .collect())
5093 }
5094
5095 pub async fn describe_table(
5096 &self,
5097 project_id: &str,
5098 scope_id: &str,
5099 table: &str,
5100 ) -> Result<TableSchema, AedbError> {
5101 let (_, catalog, _) = self.executor.snapshot_state().await;
5102 catalog
5103 .tables
5104 .get(&(namespace_key(project_id, scope_id), table.to_string()))
5105 .cloned()
5106 .ok_or_else(|| AedbError::NotFound {
5107 resource_type: ErrorResourceType::Table,
5108 resource_id: format!("{project_id}.{scope_id}.{table}"),
5109 })
5110 }
5111
5112 pub async fn list_indexes(
5113 &self,
5114 project_id: &str,
5115 scope_id: &str,
5116 table_name: &str,
5117 ) -> Result<Vec<IndexDef>, AedbError> {
5118 let (_, catalog, _) = self.executor.snapshot_state().await;
5119 let mut out: Vec<IndexDef> = catalog
5120 .indexes
5121 .values()
5122 .filter(|idx| {
5123 idx.project_id == project_id
5124 && idx.scope_id == scope_id
5125 && idx.table_name == table_name
5126 })
5127 .cloned()
5128 .collect();
5129 out.sort_by(|a, b| a.index_name.cmp(&b.index_name));
5130 Ok(out)
5131 }
5132
5133 pub async fn describe_index(
5134 &self,
5135 project_id: &str,
5136 scope_id: &str,
5137 table_name: &str,
5138 index_name: &str,
5139 ) -> Result<IndexDef, AedbError> {
5140 let (_, catalog, _) = self.executor.snapshot_state().await;
5141 catalog
5142 .indexes
5143 .get(&(
5144 namespace_key(project_id, scope_id),
5145 table_name.to_string(),
5146 index_name.to_string(),
5147 ))
5148 .cloned()
5149 .ok_or_else(|| AedbError::NotFound {
5150 resource_type: ErrorResourceType::Index,
5151 resource_id: format!("{project_id}.{scope_id}.{table_name}.{index_name}"),
5152 })
5153 }
5154
5155 pub async fn list_async_indexes(
5156 &self,
5157 project_id: &str,
5158 scope_id: &str,
5159 table_name: &str,
5160 ) -> Result<Vec<AsyncIndexDef>, AedbError> {
5161 let (_, catalog, _) = self.executor.snapshot_state().await;
5162 let mut out: Vec<AsyncIndexDef> = catalog
5163 .async_indexes
5164 .values()
5165 .filter(|idx| {
5166 idx.project_id == project_id
5167 && idx.scope_id == scope_id
5168 && idx.table_name == table_name
5169 })
5170 .cloned()
5171 .collect();
5172 out.sort_by(|a, b| a.index_name.cmp(&b.index_name));
5173 Ok(out)
5174 }
5175
5176 pub async fn describe_async_index(
5177 &self,
5178 project_id: &str,
5179 scope_id: &str,
5180 table_name: &str,
5181 index_name: &str,
5182 ) -> Result<AsyncIndexDef, AedbError> {
5183 let (_, catalog, _) = self.executor.snapshot_state().await;
5184 catalog
5185 .async_indexes
5186 .get(&(
5187 namespace_key(project_id, scope_id),
5188 table_name.to_string(),
5189 index_name.to_string(),
5190 ))
5191 .cloned()
5192 .ok_or_else(|| AedbError::NotFound {
5193 resource_type: ErrorResourceType::Index,
5194 resource_id: format!("{project_id}.{scope_id}.{table_name}.{index_name}"),
5195 })
5196 }
5197
5198 pub async fn shutdown(&self) -> Result<(), AedbError> {
5199 let _ = self.force_fsync().await?;
5201 let _ = self.checkpoint_now().await?;
5202 Ok(())
5203 }
5204
5205 pub async fn checkpoint_now(&self) -> Result<u64, AedbError> {
5206 let _checkpoint_guard = self.checkpoint_lock.lock().await;
5208
5209 let _ = self.force_fsync().await?;
5212
5213 let seq = self.executor.durable_head_seq_now();
5215 let lease = self.acquire_snapshot(ConsistencyMode::AtSeq(seq)).await?;
5216 let snapshot = lease.view.keyspace.as_ref();
5217 let catalog = lease.view.catalog.as_ref();
5218 let mut idempotency = self.executor.idempotency_snapshot().await;
5219 idempotency.retain(|_, record| record.commit_seq <= seq);
5220 let checkpoint = write_checkpoint_with_key(
5221 snapshot,
5222 catalog,
5223 seq,
5224 &self.dir,
5225 self._config.checkpoint_key(),
5226 self._config.checkpoint_key_id.clone(),
5227 idempotency,
5228 self._config.checkpoint_compression_level,
5229 )?;
5230
5231 let segments = read_segments_for_checkpoint(&self.dir, seq)?;
5232 let active_segment_seq = segments
5233 .last()
5234 .map(|segment| segment.segment_seq)
5235 .unwrap_or(seq.saturating_add(1));
5236 let manifest = Manifest {
5237 durable_seq: seq,
5238 visible_seq: seq,
5239 active_segment_seq,
5240 checkpoints: vec![checkpoint],
5241 segments,
5242 };
5243 write_manifest_atomic_signed(&manifest, &self.dir, self._config.hmac_key())?;
5244 Ok(seq)
5245 }
5246
5247 pub async fn backup_full(&self, backup_dir: &Path) -> Result<BackupManifest, AedbError> {
5248 create_private_dir_all(backup_dir)?;
5249 create_private_dir_all(&backup_dir.join("wal_tail"))?;
5250
5251 let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
5253 let snapshot_seq = lease.view.seq;
5254 let idempotency = self.executor.idempotency_snapshot().await;
5255 let checkpoint = write_checkpoint_with_key(
5256 lease.view.keyspace.as_ref(),
5257 lease.view.catalog.as_ref(),
5258 snapshot_seq,
5259 backup_dir,
5260 self._config.checkpoint_key(),
5261 self._config.checkpoint_key_id.clone(),
5262 idempotency,
5263 self._config.checkpoint_compression_level,
5264 )?;
5265
5266 let mut wal_segments = Vec::new();
5267 let mut file_sha256 = HashMap::new();
5268 file_sha256.insert(
5269 checkpoint.filename.clone(),
5270 sha256_file_hex(&backup_dir.join(&checkpoint.filename))?,
5271 );
5272
5273 for segment in read_segments(&self.dir)? {
5274 let src = self.dir.join(&segment.filename);
5275 let rel = format!("wal_tail/{}", segment.filename);
5276 let dst = backup_dir.join(&rel);
5277 fs::copy(src, &dst)?;
5278 wal_segments.push(segment.filename);
5279 file_sha256.insert(rel, sha256_file_hex(&dst)?);
5280 }
5281
5282 wal_segments.sort_by_key(|name| segment_seq_from_name(name).unwrap_or(0));
5283 let created_at_micros = std::time::SystemTime::now()
5284 .duration_since(std::time::UNIX_EPOCH)
5285 .unwrap_or_default()
5286 .as_micros() as u64;
5287 let manifest = BackupManifest {
5288 backup_id: format!("bk_{}_{}", created_at_micros, snapshot_seq),
5289 backup_type: "full".into(),
5290 parent_backup_id: None,
5291 from_seq: None,
5292 created_at_micros,
5293 aedb_version: env!("CARGO_PKG_VERSION").to_string(),
5294 checkpoint_seq: snapshot_seq,
5295 wal_head_seq: snapshot_seq,
5296 checkpoint_file: checkpoint.filename,
5297 wal_segments,
5298 file_sha256,
5299 };
5300 write_backup_manifest(backup_dir, &manifest, self._config.hmac_key())?;
5301 Ok(manifest)
5302 }
5303
5304 pub async fn backup_full_to_file(
5308 &self,
5309 backup_file: &Path,
5310 ) -> Result<BackupManifest, AedbError> {
5311 let temp = tempfile::tempdir()?;
5312 let manifest = self.backup_full(temp.path()).await?;
5313 write_backup_archive(temp.path(), backup_file, self._config.checkpoint_key())?;
5314 Ok(manifest)
5315 }
5316
5317 pub async fn backup_incremental(
5318 &self,
5319 backup_dir: &Path,
5320 parent_backup_dir: &Path,
5321 ) -> Result<BackupManifest, AedbError> {
5322 create_private_dir_all(backup_dir)?;
5323 create_private_dir_all(&backup_dir.join("wal_tail"))?;
5324 let parent = load_backup_manifest(parent_backup_dir, self._config.hmac_key())?;
5325 verify_backup_files(parent_backup_dir, &parent)?;
5326 let from_seq = parent.wal_head_seq.saturating_add(1);
5327
5328 let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
5329 let to_seq = lease.view.seq;
5330 let created_at_micros = std::time::SystemTime::now()
5331 .duration_since(std::time::UNIX_EPOCH)
5332 .unwrap_or_default()
5333 .as_micros() as u64;
5334 let mut wal_segments = Vec::new();
5335 let mut file_sha256 = HashMap::new();
5336
5337 for segment in read_segments(&self.dir)? {
5338 let src = self.dir.join(&segment.filename);
5339 let Some((min_seq, max_seq)) = scan_segment_seq_range(&src)? else {
5340 continue;
5341 };
5342 if max_seq < from_seq || min_seq > to_seq {
5343 continue;
5344 }
5345 let rel = format!("wal_tail/{}", segment.filename);
5346 let dst = backup_dir.join(&rel);
5347 fs::copy(src, &dst)?;
5348 wal_segments.push(segment.filename);
5349 file_sha256.insert(rel, sha256_file_hex(&dst)?);
5350 }
5351
5352 wal_segments.sort_by_key(|name| segment_seq_from_name(name).unwrap_or(0));
5353 let manifest = BackupManifest {
5354 backup_id: format!("bk_{}_{}", created_at_micros, to_seq),
5355 backup_type: "incremental".into(),
5356 parent_backup_id: Some(parent.backup_id),
5357 from_seq: Some(from_seq),
5358 created_at_micros,
5359 aedb_version: env!("CARGO_PKG_VERSION").to_string(),
5360 checkpoint_seq: parent.checkpoint_seq,
5361 wal_head_seq: to_seq,
5362 checkpoint_file: parent.checkpoint_file,
5363 wal_segments,
5364 file_sha256,
5365 };
5366 write_backup_manifest(backup_dir, &manifest, self._config.hmac_key())?;
5367 Ok(manifest)
5368 }
5369
5370 pub fn restore_from_backup(
5371 backup_dir: &Path,
5372 data_dir: &Path,
5373 config: &AedbConfig,
5374 ) -> Result<u64, AedbError> {
5375 Self::restore_from_backup_chain(&[backup_dir.to_path_buf()], data_dir, config, None)
5376 }
5377
5378 pub fn restore_from_backup_file(
5380 backup_file: &Path,
5381 data_dir: &Path,
5382 config: &AedbConfig,
5383 ) -> Result<u64, AedbError> {
5384 let temp = tempfile::tempdir()?;
5385 extract_backup_archive(backup_file, temp.path(), config.checkpoint_key())?;
5386 Self::restore_from_backup(temp.path(), data_dir, config)
5387 }
5388
5389 pub fn restore_from_backup_chain(
5390 backup_dirs: &[PathBuf],
5391 data_dir: &Path,
5392 config: &AedbConfig,
5393 target_seq: Option<u64>,
5394 ) -> Result<u64, AedbError> {
5395 let chain = load_verified_backup_chain(backup_dirs, config)?;
5396 if data_dir.exists() && fs::read_dir(data_dir)?.next().is_some() {
5397 return Err(AedbError::Validation(
5398 "restore target directory must be empty".into(),
5399 ));
5400 }
5401 create_private_dir_all(data_dir)?;
5402 let effective_target =
5403 target_seq.unwrap_or(chain.last().expect("non-empty").1.wal_head_seq);
5404 let full = &chain[0].1;
5405 if effective_target < full.checkpoint_seq {
5406 return Err(AedbError::Validation(
5407 "target_seq is older than full checkpoint_seq".into(),
5408 ));
5409 }
5410
5411 let checkpoint_path = resolve_backup_path(&chain[0].0, &full.checkpoint_file)?;
5412 let (mut keyspace, mut catalog, checkpoint_seq, mut idempotency) =
5413 load_checkpoint_with_key(&checkpoint_path, config.checkpoint_key())?;
5414 let mut current_seq = checkpoint_seq;
5415 let mut last_verified_chain: Option<(u64, [u8; 32])> = None;
5416
5417 for (dir, manifest) in &chain {
5418 let replay_to = effective_target.min(manifest.wal_head_seq);
5419 if replay_to <= current_seq {
5420 continue;
5421 }
5422 let mut wal_paths = Vec::new();
5423 for seg in &manifest.wal_segments {
5424 wal_paths.push(resolve_backup_path(dir, &format!("wal_tail/{seg}"))?);
5425 }
5426 wal_paths.sort_by_key(|p| {
5427 p.file_name()
5428 .and_then(|n| segment_seq_from_name(&n.to_string_lossy()))
5429 .unwrap_or(0)
5430 });
5431 if config.hash_chain_required && !wal_paths.is_empty() {
5432 let first_seq = wal_paths
5433 .first()
5434 .and_then(|p| p.file_name())
5435 .and_then(|n| segment_seq_from_name(&n.to_string_lossy()));
5436 let chain_anchor = match (last_verified_chain, first_seq) {
5437 (Some((last_seq, last_hash)), Some(first_seq)) if first_seq > last_seq => {
5438 Some(last_hash)
5439 }
5440 _ => None,
5441 };
5442 if let Some(last) = verify_hash_chain_batch(&wal_paths, chain_anchor)? {
5443 last_verified_chain = Some(last);
5444 }
5445 }
5446 current_seq = replay_segments(
5447 &wal_paths,
5448 current_seq,
5449 Some(replay_to),
5450 None,
5451 false,
5452 config.strict_recovery(),
5453 &mut keyspace,
5454 &mut catalog,
5455 &mut idempotency,
5456 )?;
5457 if current_seq >= effective_target {
5458 break;
5459 }
5460 }
5461 let restored_seq = current_seq;
5462 let cp = write_checkpoint_with_key(
5463 &keyspace.snapshot(),
5464 &catalog,
5465 restored_seq,
5466 data_dir,
5467 config.checkpoint_key(),
5468 config.checkpoint_key_id.clone(),
5469 idempotency,
5470 config.checkpoint_compression_level,
5471 )?;
5472 let restored_manifest = Manifest {
5473 durable_seq: restored_seq,
5474 visible_seq: restored_seq,
5475 active_segment_seq: restored_seq + 1,
5476 checkpoints: vec![cp],
5477 segments: Vec::new(),
5478 };
5479 write_manifest_atomic_signed(&restored_manifest, data_dir, config.hmac_key())?;
5480 Ok(restored_seq)
5481 }
5482
5483 pub fn restore_from_backup_chain_at_time(
5484 backup_dirs: &[PathBuf],
5485 data_dir: &Path,
5486 config: &AedbConfig,
5487 target_time_micros: u64,
5488 ) -> Result<u64, AedbError> {
5489 let chain = load_verified_backup_chain(backup_dirs, config)?;
5490 let target_seq = resolve_target_seq_for_time(&chain, target_time_micros)?;
5491 Self::restore_from_backup_chain(backup_dirs, data_dir, config, Some(target_seq))
5492 }
5493
5494 pub fn restore_namespace_from_backup_chain(
5495 backup_dirs: &[PathBuf],
5496 data_dir: &Path,
5497 config: &AedbConfig,
5498 project_id: &str,
5499 scope_id: &str,
5500 target_seq: Option<u64>,
5501 ) -> Result<u64, AedbError> {
5502 let chain = load_verified_backup_chain(backup_dirs, config)?;
5503 let effective_target =
5504 target_seq.unwrap_or(chain.last().expect("non-empty").1.wal_head_seq);
5505 let live = recover_with_config(data_dir, config)?;
5506
5507 let temp = tempfile::tempdir()?;
5508 let _ = Self::restore_from_backup_chain(
5509 backup_dirs,
5510 temp.path(),
5511 config,
5512 Some(effective_target),
5513 )?;
5514 let restored = recover_with_config(temp.path(), config)?;
5515
5516 let ns_key = namespace_key(project_id, scope_id);
5517 let ns_id = NamespaceId::project_scope(project_id, scope_id);
5518
5519 let mut merged_namespaces = live
5520 .keyspace
5521 .namespaces
5522 .iter()
5523 .filter(|(ns, _)| **ns != ns_id)
5524 .map(|(ns, namespace)| (ns.clone(), namespace.clone()))
5525 .collect::<HashMap<_, _>>();
5526 if let Some(namespace) = restored.keyspace.namespaces.get(&ns_id) {
5527 merged_namespaces.insert(ns_id.clone(), namespace.clone());
5528 }
5529 let mut merged_async_indexes = live
5530 .keyspace
5531 .async_indexes
5532 .iter()
5533 .filter(|(key, _)| key.0 != ns_id)
5534 .map(|(key, value)| (key.clone(), value.clone()))
5535 .collect::<HashMap<_, _>>();
5536 for (key, value) in restored.keyspace.async_indexes.iter() {
5537 if key.0 == ns_id {
5538 merged_async_indexes.insert(key.clone(), value.clone());
5539 }
5540 }
5541 let merged_keyspace = Keyspace {
5542 primary_index_backend: live.keyspace.primary_index_backend,
5543 namespaces: Arc::new(merged_namespaces.into()),
5544 async_indexes: Arc::new(merged_async_indexes.into()),
5545 };
5546
5547 let mut merged_catalog = live.catalog.clone();
5548 if let Some(project) = restored.catalog.projects.get(project_id) {
5549 merged_catalog
5550 .projects
5551 .insert(project_id.to_string(), project.clone());
5552 }
5553 let scope_key = (project_id.to_string(), scope_id.to_string());
5554 match restored.catalog.scopes.get(&scope_key) {
5555 Some(scope) => {
5556 merged_catalog
5557 .scopes
5558 .insert(scope_key.clone(), scope.clone());
5559 }
5560 None => {
5561 merged_catalog.scopes.remove(&scope_key);
5562 }
5563 }
5564 let table_keys: Vec<(String, String)> = merged_catalog
5565 .tables
5566 .keys()
5567 .filter(|(ns, _)| ns == &ns_key)
5568 .cloned()
5569 .collect();
5570 for key in table_keys {
5571 merged_catalog.tables.remove(&key);
5572 }
5573 for (key, table) in restored.catalog.tables.iter() {
5574 if key.0 == ns_key {
5575 merged_catalog.tables.insert(key.clone(), table.clone());
5576 }
5577 }
5578
5579 let index_keys: Vec<(String, String, String)> = merged_catalog
5580 .indexes
5581 .keys()
5582 .filter(|(ns, _, _)| ns == &ns_key)
5583 .cloned()
5584 .collect();
5585 for key in index_keys {
5586 merged_catalog.indexes.remove(&key);
5587 }
5588 for (key, index) in restored.catalog.indexes.iter() {
5589 if key.0 == ns_key {
5590 merged_catalog.indexes.insert(key.clone(), index.clone());
5591 }
5592 }
5593
5594 let async_index_keys: Vec<(String, String, String)> = merged_catalog
5595 .async_indexes
5596 .keys()
5597 .filter(|(ns, _, _)| ns == &ns_key)
5598 .cloned()
5599 .collect();
5600 for key in async_index_keys {
5601 merged_catalog.async_indexes.remove(&key);
5602 }
5603 for (key, index) in restored.catalog.async_indexes.iter() {
5604 if key.0 == ns_key {
5605 merged_catalog
5606 .async_indexes
5607 .insert(key.clone(), index.clone());
5608 }
5609 }
5610
5611 let merged_seq = live.current_seq.max(effective_target);
5612 let cp = write_checkpoint_with_key(
5613 &merged_keyspace.snapshot(),
5614 &merged_catalog,
5615 merged_seq,
5616 data_dir,
5617 config.checkpoint_key(),
5618 config.checkpoint_key_id.clone(),
5619 live.idempotency,
5620 config.checkpoint_compression_level,
5621 )?;
5622 let manifest = Manifest {
5623 durable_seq: merged_seq,
5624 visible_seq: merged_seq,
5625 active_segment_seq: merged_seq + 1,
5626 checkpoints: vec![cp],
5627 segments: Vec::new(),
5628 };
5629 write_manifest_atomic_signed(&manifest, data_dir, config.hmac_key())?;
5630 Ok(merged_seq)
5631 }
5632
5633 pub async fn snapshot_probe(&self, consistency: ConsistencyMode) -> Result<u64, AedbError> {
5634 let lease = self.acquire_snapshot(consistency).await?;
5635 Ok(lease.view.seq)
5636 }
5637
5638 pub fn metrics(&self) -> ExecutorMetrics {
5639 self.executor.metrics()
5640 }
5641
5642 pub async fn operational_metrics(&self) -> OperationalMetrics {
5643 let core = self.executor.metrics();
5644 let runtime = self.executor.runtime_state_metrics().await;
5645 let now_micros = std::time::SystemTime::now()
5646 .duration_since(std::time::UNIX_EPOCH)
5647 .unwrap_or_default()
5648 .as_micros() as u64;
5649 let snapshot_age_micros = now_micros.saturating_sub(runtime.last_full_snapshot_micros);
5650 let conflict_rate = if core.commits_total == 0 {
5651 0.0
5652 } else {
5653 core.conflict_rejections as f64 / core.commits_total as f64
5654 };
5655 let durable_wait_ops = self.durable_wait_ops.load(Ordering::Relaxed);
5656 let durable_wait_micros = self.durable_wait_micros.load(Ordering::Relaxed);
5657 let avg_durable_wait_micros = if durable_wait_ops == 0 {
5658 0
5659 } else {
5660 durable_wait_micros / durable_wait_ops
5661 };
5662 let upstream_validation_rejections =
5663 self.upstream_validation_rejections.load(Ordering::Relaxed);
5664 OperationalMetrics {
5665 commits_total: core.commits_total,
5666 commit_errors: core.commit_errors,
5667 permission_rejections: core.permission_rejections,
5668 validation_rejections: core
5669 .validation_rejections
5670 .saturating_add(upstream_validation_rejections),
5671 queue_full_rejections: core.queue_full_rejections,
5672 timeout_rejections: core.timeout_rejections,
5673 conflict_rejections: core.conflict_rejections,
5674 read_set_conflicts: core.read_set_conflicts,
5675 conflict_rate,
5676 avg_commit_latency_micros: core.avg_commit_latency_micros,
5677 coordinator_apply_attempts: core.coordinator_apply_attempts,
5678 avg_coordinator_apply_micros: core.avg_coordinator_apply_micros,
5679 wal_append_ops: core.wal_append_ops,
5680 wal_append_bytes: core.wal_append_bytes,
5681 avg_wal_append_micros: core.avg_wal_append_micros,
5682 wal_sync_ops: core.wal_sync_ops,
5683 avg_wal_sync_micros: core.avg_wal_sync_micros,
5684 prestage_validate_ops: core.prestage_validate_ops,
5685 avg_prestage_validate_micros: core.avg_prestage_validate_micros,
5686 epoch_process_ops: core.epoch_process_ops,
5687 avg_epoch_process_micros: core.avg_epoch_process_micros,
5688 durable_wait_ops,
5689 avg_durable_wait_micros,
5690 inflight_commits: core.inflight_commits,
5691 queue_depth: core.queued_commits,
5692 durable_head_lag: runtime
5693 .visible_head_seq
5694 .saturating_sub(runtime.durable_head_seq),
5695 visible_head_seq: runtime.visible_head_seq,
5696 durable_head_seq: runtime.durable_head_seq,
5697 current_seq: runtime.current_seq,
5698 snapshot_age_micros,
5699 startup_recovery_micros: self.startup_recovery_micros,
5700 startup_recovered_seq: self.startup_recovered_seq,
5701 }
5702 }
5703
5704 pub async fn estimated_memory_bytes(&self) -> usize {
5709 let (snapshot, _, _) = self.executor.snapshot_state().await;
5710 snapshot.estimate_memory_bytes()
5711 }
5712
5713 pub async fn wait_for_durable(&self, seq: u64) -> Result<(), AedbError> {
5714 self.executor.wait_for_durable(seq).await
5715 }
5716
5717 pub async fn force_fsync(&self) -> Result<u64, AedbError> {
5718 self.executor.force_fsync().await
5719 }
5720
5721 pub async fn apply_migrations(&self, mut migrations: Vec<Migration>) -> Result<(), AedbError> {
5722 migrations.sort_by_key(|m| m.version);
5723 for migration in migrations {
5724 self.apply_migration(migration).await?;
5725 }
5726 Ok(())
5727 }
5728
5729 pub async fn run_migrations(
5730 &self,
5731 mut migrations: Vec<Migration>,
5732 ) -> Result<MigrationReport, AedbError> {
5733 migrations.sort_by_key(|m| m.version);
5734 if migrations.is_empty() {
5735 return Ok(MigrationReport {
5736 applied: Vec::new(),
5737 skipped: Vec::new(),
5738 current_version: 0,
5739 });
5740 }
5741 let project_id = migrations[0].project_id.clone();
5742 let scope_id = migrations[0].scope_id.clone();
5743 if migrations
5744 .iter()
5745 .any(|m| m.project_id != project_id || m.scope_id != scope_id)
5746 {
5747 return Err(AedbError::InvalidConfig {
5748 message: "all migrations in run_migrations must target the same project and scope"
5749 .into(),
5750 });
5751 }
5752 let mut applied = Vec::new();
5753 let mut skipped = Vec::new();
5754 let existing = self
5755 .list_applied_migrations(&project_id, &scope_id)
5756 .await?
5757 .into_iter()
5758 .map(|record| (record.version, record))
5759 .collect::<HashMap<_, _>>();
5760 for migration in migrations {
5761 if let Some(record) = existing.get(&migration.version) {
5762 let checksum = checksum_hex(&migration)?;
5763 if checksum != record.checksum_hex {
5764 return Err(AedbError::IntegrityError {
5765 message: format!(
5766 "migration checksum mismatch for version {}",
5767 migration.version
5768 ),
5769 });
5770 }
5771 skipped.push(migration.version);
5772 continue;
5773 }
5774 let started = Instant::now();
5775 let version = migration.version;
5776 let name = migration.name.clone();
5777 self.apply_migration(migration).await?;
5778 applied.push((version, name, started.elapsed()));
5779 }
5780 let current_version = self.current_version(&project_id, &scope_id).await?;
5781 Ok(MigrationReport {
5782 applied,
5783 skipped,
5784 current_version,
5785 })
5786 }
5787
5788 pub async fn apply_migration(&self, migration: Migration) -> Result<(), AedbError> {
5789 const MAX_RETRIES_ON_CONFLICT: usize = 8;
5790
5791 let checksum = checksum_hex(&migration)?;
5792 let key = migration_key(migration.version);
5793 for attempt in 0..=MAX_RETRIES_ON_CONFLICT {
5794 let (snapshot, _, _) = self.executor.snapshot_state().await;
5795 if let Some(existing) =
5796 snapshot.kv_get(&migration.project_id, &migration.scope_id, &key)
5797 {
5798 let record = decode_record(&existing.value)?;
5799 if record.checksum_hex != checksum {
5800 return Err(AedbError::IntegrityError {
5801 message: format!(
5802 "migration checksum mismatch for version {}",
5803 migration.version
5804 ),
5805 });
5806 }
5807 return Ok(());
5808 }
5809
5810 let mut mutations = migration.mutations.clone();
5811 let predicted_applied_seq = self.executor.current_seq().await + 1;
5812 let record = MigrationRecord {
5813 version: migration.version,
5814 name: migration.name.clone(),
5815 project_id: migration.project_id.clone(),
5816 scope_id: migration.scope_id.clone(),
5817 applied_at_micros: std::time::SystemTime::now()
5818 .duration_since(std::time::UNIX_EPOCH)
5819 .unwrap_or_default()
5820 .as_micros() as u64,
5821 applied_seq: predicted_applied_seq,
5822 checksum_hex: checksum.clone(),
5823 };
5824 mutations.push(Mutation::KvSet {
5825 project_id: migration.project_id.clone(),
5826 scope_id: migration.scope_id.clone(),
5827 key: key.clone(),
5828 value: encode_record(&record)?,
5829 });
5830 let base_seq = predicted_applied_seq.saturating_sub(1);
5831 match self
5832 .commit_envelope(TransactionEnvelope {
5833 caller: None,
5834 idempotency_key: None,
5835 write_class: crate::commit::tx::WriteClass::Standard,
5836 assertions: Vec::new(),
5837 read_set: crate::commit::tx::ReadSet::default(),
5838 write_intent: crate::commit::tx::WriteIntent { mutations },
5839 base_seq,
5840 })
5841 .await
5842 {
5843 Ok(_) => return Ok(()),
5844 Err(AedbError::Conflict(_)) if attempt < MAX_RETRIES_ON_CONFLICT => continue,
5845 Err(err) => {
5846 let (snapshot, _, _) = self.executor.snapshot_state().await;
5847 if let Some(existing) =
5848 snapshot.kv_get(&migration.project_id, &migration.scope_id, &key)
5849 {
5850 let record = decode_record(&existing.value)?;
5851 if record.checksum_hex != checksum {
5852 return Err(AedbError::IntegrityError {
5853 message: format!(
5854 "migration checksum mismatch for version {}",
5855 migration.version
5856 ),
5857 });
5858 }
5859 return Ok(());
5860 }
5861 return Err(err);
5862 }
5863 }
5864 }
5865
5866 Err(AedbError::Conflict(format!(
5867 "migration {} could not be applied due to repeated conflicts",
5868 migration.version
5869 )))
5870 }
5871
5872 pub async fn list_applied_migrations(
5873 &self,
5874 project_id: &str,
5875 scope_id: &str,
5876 ) -> Result<Vec<MigrationRecord>, AedbError> {
5877 let (snapshot, _, _) = self.executor.snapshot_state().await;
5878 let ns = NamespaceId::project_scope(project_id, scope_id);
5879 let mut out = Vec::new();
5880 if let Some(namespace) = snapshot.namespaces.get(&ns) {
5881 for (k, v) in &namespace.kv.entries {
5882 if k.starts_with(b"__migrations/") {
5883 out.push(decode_record(&v.value)?);
5884 }
5885 }
5886 }
5887 out.sort_by_key(|r| r.version);
5888 Ok(out)
5889 }
5890
5891 pub async fn applied_migrations(
5892 &self,
5893 project_id: &str,
5894 scope_id: &str,
5895 ) -> Result<Vec<MigrationRecord>, AedbError> {
5896 self.list_applied_migrations(project_id, scope_id).await
5897 }
5898
5899 pub async fn current_version(
5900 &self,
5901 project_id: &str,
5902 scope_id: &str,
5903 ) -> Result<u64, AedbError> {
5904 Ok(self
5905 .list_applied_migrations(project_id, scope_id)
5906 .await?
5907 .last()
5908 .map_or(0, |record| record.version))
5909 }
5910
5911 pub async fn rollback_to_migration(
5912 &self,
5913 project_id: &str,
5914 scope_id: &str,
5915 target_version_inclusive: u64,
5916 mut available_migrations: Vec<Migration>,
5917 ) -> Result<(), AedbError> {
5918 let applied = self.list_applied_migrations(project_id, scope_id).await?;
5919 available_migrations.sort_by_key(|m| m.version);
5920 for record in applied.into_iter().rev() {
5921 if record.version <= target_version_inclusive {
5922 break;
5923 }
5924 let migration = available_migrations
5925 .iter()
5926 .find(|m| m.version == record.version)
5927 .ok_or_else(|| {
5928 AedbError::Validation(format!(
5929 "rollback migration definition missing for version {}",
5930 record.version
5931 ))
5932 })?;
5933 let Some(down) = &migration.down_mutations else {
5934 return Err(AedbError::Validation(format!(
5935 "migration {} has no down_mutations",
5936 record.version
5937 )));
5938 };
5939 let mut mutations = down.clone();
5940 mutations.push(Mutation::KvDel {
5941 project_id: project_id.to_string(),
5942 scope_id: scope_id.to_string(),
5943 key: migration_key(record.version),
5944 });
5945 let base_seq = self.executor.current_seq().await;
5946 self.commit_envelope(TransactionEnvelope {
5947 caller: None,
5948 idempotency_key: None,
5949 write_class: crate::commit::tx::WriteClass::Standard,
5950 assertions: Vec::new(),
5951 read_set: crate::commit::tx::ReadSet::default(),
5952 write_intent: crate::commit::tx::WriteIntent { mutations },
5953 base_seq,
5954 })
5955 .await?;
5956 }
5957 Ok(())
5958 }
5959
5960 pub async fn backfill_table(
5961 &self,
5962 project_id: &str,
5963 scope_id: &str,
5964 table_name: &str,
5965 update: fn(&crate::catalog::types::Row) -> Option<crate::catalog::types::Row>,
5966 ) -> Result<u64, AedbError> {
5967 self.backfill_table_batched(project_id, scope_id, table_name, usize::MAX, update)
5968 .await
5969 }
5970
5971 pub async fn backfill_table_batched(
5972 &self,
5973 project_id: &str,
5974 scope_id: &str,
5975 table_name: &str,
5976 batch_size: usize,
5977 update: fn(&crate::catalog::types::Row) -> Option<crate::catalog::types::Row>,
5978 ) -> Result<u64, AedbError> {
5979 let batch_size = batch_size.max(1);
5980 let (snapshot, catalog, _) = self.executor.snapshot_state().await;
5981 let schema = catalog
5982 .tables
5983 .get(&(namespace_key(project_id, scope_id), table_name.to_string()))
5984 .ok_or_else(|| AedbError::Validation("table not found".into()))?
5985 .clone();
5986 let table = snapshot
5987 .table(project_id, scope_id, table_name)
5988 .ok_or_else(|| AedbError::Validation("table not found".into()))?;
5989 let rows: Vec<crate::catalog::types::Row> = table.rows.values().cloned().collect();
5990 let mut updated = 0u64;
5991 for chunk in rows.chunks(batch_size) {
5992 for row in chunk {
5993 if let Some(new_row) = update(row) {
5994 let primary_key = schema
5995 .primary_key
5996 .iter()
5997 .map(|pk_name| {
5998 let column_index = schema
5999 .columns
6000 .iter()
6001 .position(|c| c.name == *pk_name)
6002 .ok_or_else(|| {
6003 AedbError::Validation(format!(
6004 "primary key column missing: {pk_name}"
6005 ))
6006 })?;
6007 Ok(new_row.values[column_index].clone())
6008 })
6009 .collect::<Result<Vec<_>, AedbError>>()?;
6010 self.commit(Mutation::Upsert {
6011 project_id: project_id.to_string(),
6012 scope_id: scope_id.to_string(),
6013 table_name: table_name.to_string(),
6014 primary_key,
6015 row: new_row,
6016 })
6017 .await?;
6018 updated += 1;
6019 }
6020 }
6021 }
6022 Ok(updated)
6023 }
6024
6025 pub async fn backfill_table_batched_resumable(
6026 &self,
6027 project_id: &str,
6028 scope_id: &str,
6029 table_name: &str,
6030 batch_size: usize,
6031 progress_key: &[u8],
6032 update: fn(&crate::catalog::types::Row) -> Option<crate::catalog::types::Row>,
6033 ) -> Result<u64, AedbError> {
6034 let batch_size = batch_size.max(1);
6035 let (snapshot, catalog, _) = self.executor.snapshot_state().await;
6036 let schema = catalog
6037 .tables
6038 .get(&(namespace_key(project_id, scope_id), table_name.to_string()))
6039 .ok_or_else(|| AedbError::Validation("table not found".into()))?
6040 .clone();
6041 let table = snapshot
6042 .table(project_id, scope_id, table_name)
6043 .ok_or_else(|| AedbError::Validation("table not found".into()))?;
6044 let rows: Vec<crate::catalog::types::Row> = table.rows.values().cloned().collect();
6045 let start_offset = snapshot
6046 .kv_get(project_id, scope_id, progress_key)
6047 .and_then(|e| std::str::from_utf8(&e.value).ok()?.parse::<usize>().ok())
6048 .unwrap_or(0);
6049 let mut updated = 0u64;
6050 let start_offset = start_offset.min(rows.len());
6051 for (chunk_index, chunk) in rows[start_offset..].chunks(batch_size).enumerate() {
6052 for row in chunk {
6053 if let Some(new_row) = update(row) {
6054 let primary_key = schema
6055 .primary_key
6056 .iter()
6057 .map(|pk_name| {
6058 let column_index = schema
6059 .columns
6060 .iter()
6061 .position(|c| c.name == *pk_name)
6062 .ok_or_else(|| {
6063 AedbError::Validation(format!(
6064 "primary key column missing: {pk_name}"
6065 ))
6066 })?;
6067 Ok(new_row.values[column_index].clone())
6068 })
6069 .collect::<Result<Vec<_>, AedbError>>()?;
6070 self.commit(Mutation::Upsert {
6071 project_id: project_id.to_string(),
6072 scope_id: scope_id.to_string(),
6073 table_name: table_name.to_string(),
6074 primary_key,
6075 row: new_row,
6076 })
6077 .await?;
6078 updated += 1;
6079 }
6080 }
6081 let progressed =
6082 start_offset + ((chunk_index + 1) * batch_size).min(rows.len() - start_offset);
6083 self.commit(Mutation::KvSet {
6084 project_id: project_id.to_string(),
6085 scope_id: scope_id.to_string(),
6086 key: progress_key.to_vec(),
6087 value: progressed.to_string().into_bytes(),
6088 })
6089 .await?;
6090 }
6091 Ok(updated)
6092 }
6093
6094 pub async fn head_state(&self) -> crate::commit::executor::HeadState {
6095 self.executor.head_state().await
6096 }
6097}
6098
6099impl ReadTx<'_> {
6100 pub fn snapshot_seq(&self) -> u64 {
6101 self.lease.view.seq
6102 }
6103
6104 pub async fn query(
6105 &self,
6106 project_id: &str,
6107 scope_id: &str,
6108 query: Query,
6109 ) -> Result<QueryResult, QueryError> {
6110 self.query_with_options(project_id, scope_id, query, QueryOptions::default())
6111 .await
6112 }
6113
6114 pub async fn query_with_options(
6115 &self,
6116 project_id: &str,
6117 scope_id: &str,
6118 query: Query,
6119 mut options: QueryOptions,
6120 ) -> Result<QueryResult, QueryError> {
6121 options.consistency = ConsistencyMode::AtSeq(self.lease.view.seq);
6122 let started = Instant::now();
6123 let table = query.table.clone();
6124 let result = execute_query_against_view(
6125 &self.lease.view,
6126 project_id,
6127 scope_id,
6128 query,
6129 &options,
6130 self.caller.as_ref(),
6131 self.db._config.max_scan_rows,
6132 );
6133 self.db.emit_query_telemetry(
6134 started,
6135 project_id,
6136 scope_id,
6137 &table,
6138 self.lease.view.seq,
6139 &result,
6140 );
6141 result
6142 }
6143
6144 pub async fn query_batch(
6145 &self,
6146 project_id: &str,
6147 scope_id: &str,
6148 items: Vec<QueryBatchItem>,
6149 ) -> Result<Vec<QueryResult>, QueryError> {
6150 if items.is_empty() {
6151 return Ok(Vec::new());
6152 }
6153 let mut out = Vec::with_capacity(items.len());
6154 for item in items {
6155 out.push(
6156 self.query_with_options(project_id, scope_id, item.query, item.options)
6157 .await?,
6158 );
6159 }
6160 Ok(out)
6161 }
6162
6163 pub async fn exists(
6164 &self,
6165 project_id: &str,
6166 scope_id: &str,
6167 mut query: Query,
6168 ) -> Result<bool, QueryError> {
6169 query.limit = Some(1);
6170 let result = self.query(project_id, scope_id, query).await?;
6171 Ok(!result.rows.is_empty())
6172 }
6173
6174 pub fn explain(
6175 &self,
6176 project_id: &str,
6177 scope_id: &str,
6178 query: Query,
6179 mut options: QueryOptions,
6180 ) -> Result<QueryDiagnostics, QueryError> {
6181 options.consistency = ConsistencyMode::AtSeq(self.lease.view.seq);
6182 explain_query_against_view(
6183 &self.lease.view,
6184 project_id,
6185 scope_id,
6186 query,
6187 &options,
6188 self.caller.as_ref(),
6189 self.db._config.max_scan_rows,
6190 )
6191 }
6192
6193 pub async fn query_with_diagnostics(
6194 &self,
6195 project_id: &str,
6196 scope_id: &str,
6197 query: Query,
6198 options: QueryOptions,
6199 ) -> Result<QueryWithDiagnosticsResult, QueryError> {
6200 let diagnostics = self.explain(project_id, scope_id, query.clone(), options.clone())?;
6201 let result = self
6202 .query_with_options(project_id, scope_id, query, options)
6203 .await?;
6204 Ok(QueryWithDiagnosticsResult {
6205 result,
6206 diagnostics,
6207 })
6208 }
6209
6210 pub async fn query_page_stable(
6211 &self,
6212 project_id: &str,
6213 scope_id: &str,
6214 query: Query,
6215 cursor: Option<String>,
6216 page_size: usize,
6217 ) -> Result<QueryResult, QueryError> {
6218 let query =
6219 ensure_stable_order_from_catalog(project_id, scope_id, &self.lease.view.catalog, query);
6220 self.query_with_options(
6221 project_id,
6222 scope_id,
6223 query.limit(page_size.max(1)),
6224 QueryOptions {
6225 consistency: ConsistencyMode::AtSeq(self.lease.view.seq),
6226 cursor,
6227 ..QueryOptions::default()
6228 },
6229 )
6230 .await
6231 }
6232
6233 pub async fn list_with_total(
6234 &self,
6235 project_id: &str,
6236 scope_id: &str,
6237 mut query: Query,
6238 cursor: Option<String>,
6239 offset: Option<usize>,
6240 page_size: usize,
6241 ) -> Result<ListPageResult, QueryError> {
6242 if !query.aggregates.is_empty() || !query.group_by.is_empty() || query.having.is_some() {
6243 return Err(QueryError::InvalidQuery {
6244 reason: "list_with_total expects non-aggregate query".into(),
6245 });
6246 }
6247 query =
6248 ensure_stable_order_from_catalog(project_id, scope_id, &self.lease.view.catalog, query);
6249 let count_query = Query {
6250 select: vec!["count_star".into()],
6251 table: query.table.clone(),
6252 table_alias: query.table_alias.clone(),
6253 joins: query.joins.clone(),
6254 predicate: query.predicate.clone(),
6255 order_by: Vec::new(),
6256 limit: None,
6257 group_by: Vec::new(),
6258 aggregates: vec![crate::query::plan::Aggregate::Count],
6259 having: None,
6260 use_index: query.use_index.clone(),
6261 };
6262 let count_result = self
6263 .query_with_options(
6264 project_id,
6265 scope_id,
6266 count_query,
6267 QueryOptions {
6268 allow_full_scan: true,
6269 ..QueryOptions::default()
6270 },
6271 )
6272 .await?;
6273 let total_count = count_result
6274 .rows
6275 .first()
6276 .and_then(|row| row.values.first())
6277 .and_then(|value| match value {
6278 Value::Integer(v) => usize::try_from(*v).ok(),
6279 _ => None,
6280 })
6281 .unwrap_or(0);
6282
6283 if let Some(offset) = offset {
6284 let mut full_query = query.clone();
6285 full_query.limit = None;
6286 let full = self
6287 .query_with_options(
6288 project_id,
6289 scope_id,
6290 full_query,
6291 QueryOptions {
6292 allow_full_scan: true,
6293 ..QueryOptions::default()
6294 },
6295 )
6296 .await?;
6297 let rows = full
6298 .rows
6299 .into_iter()
6300 .skip(offset)
6301 .take(page_size.max(1))
6302 .collect::<Vec<_>>();
6303 return Ok(ListPageResult {
6304 rows,
6305 total_count,
6306 next_cursor: None,
6307 snapshot_seq: full.snapshot_seq,
6308 rows_examined: full.rows_examined,
6309 });
6310 }
6311
6312 let page = self
6313 .query_page_stable(project_id, scope_id, query, cursor, page_size)
6314 .await?;
6315 Ok(ListPageResult {
6316 rows: page.rows,
6317 total_count,
6318 next_cursor: page.cursor,
6319 snapshot_seq: page.snapshot_seq,
6320 rows_examined: page.rows_examined,
6321 })
6322 }
6323
6324 pub async fn lookup_then_hydrate(
6325 &self,
6326 project_id: &str,
6327 scope_id: &str,
6328 source_query: Query,
6329 source_key_index: usize,
6330 hydrate_query: Query,
6331 hydrate_key_column: &str,
6332 ) -> Result<(QueryResult, QueryResult), QueryError> {
6333 let mut source = self.query(project_id, scope_id, source_query).await?;
6334 source.cursor = None;
6337 source.truncated = false;
6338 let keys = source
6339 .rows
6340 .iter()
6341 .filter_map(|row| row.values.get(source_key_index).cloned())
6342 .collect::<Vec<_>>();
6343 if keys.is_empty() {
6344 return Ok((
6345 source,
6346 QueryResult {
6347 rows: Vec::new(),
6348 rows_examined: 0,
6349 cursor: None,
6350 truncated: false,
6351 snapshot_seq: self.lease.view.seq,
6352 materialized_seq: None,
6353 },
6354 ));
6355 }
6356 let predicate = Expr::In(hydrate_key_column.to_string(), keys);
6357 let hydrate_query = Query {
6358 predicate: Some(match hydrate_query.predicate {
6359 Some(existing) => Expr::And(Box::new(existing), Box::new(predicate)),
6360 None => predicate,
6361 }),
6362 ..hydrate_query
6363 };
6364 let page_size = self.db._config.max_scan_rows.min(100).max(1);
6365 let mut hydrate_query = ensure_stable_order_from_catalog(
6366 project_id,
6367 scope_id,
6368 &self.lease.view.catalog,
6369 hydrate_query,
6370 );
6371 hydrate_query.limit = Some(page_size);
6372
6373 let mut all_rows = Vec::new();
6374 let mut total_rows_examined = 0usize;
6375 let mut next_cursor: Option<String> = None;
6376 let mut materialized_seq = None;
6377 loop {
6378 let page = self
6379 .query_with_options(
6380 project_id,
6381 scope_id,
6382 hydrate_query.clone(),
6383 QueryOptions {
6384 consistency: ConsistencyMode::AtSeq(self.lease.view.seq),
6385 cursor: next_cursor.clone(),
6386 ..QueryOptions::default()
6387 },
6388 )
6389 .await?;
6390 total_rows_examined = total_rows_examined.saturating_add(page.rows_examined);
6391 if materialized_seq.is_none() {
6392 materialized_seq = page.materialized_seq;
6393 }
6394 all_rows.extend(page.rows);
6395 if let Some(cursor) = page.cursor {
6396 next_cursor = Some(cursor);
6397 continue;
6398 }
6399 break;
6400 }
6401 let hydrated = QueryResult {
6402 rows: all_rows,
6403 rows_examined: total_rows_examined,
6404 cursor: None,
6405 truncated: false,
6406 snapshot_seq: self.lease.view.seq,
6407 materialized_seq,
6408 };
6409 Ok((source, hydrated))
6410 }
6411}