Skip to main content

aedb/
lib.rs

1pub mod backup;
2pub mod catalog;
3pub mod checkpoint;
4pub mod commit;
5pub mod config;
6pub mod declarative;
7pub mod error;
8mod lib_helpers;
9#[cfg(test)]
10mod lib_tests;
11pub mod manifest;
12pub mod migration;
13pub mod offline;
14pub mod order_book;
15pub mod permission;
16pub mod preflight;
17pub mod query;
18pub mod recovery;
19pub mod repository;
20pub mod snapshot;
21pub mod storage;
22pub mod sync_bridge;
23pub mod version_store;
24pub mod wal;
25
26use crate::backup::{
27    BackupManifest, extract_backup_archive, load_backup_manifest, resolve_backup_path,
28    sha256_file_hex, verify_backup_files, write_backup_archive, write_backup_manifest,
29};
30use crate::catalog::namespace_key;
31use crate::catalog::schema::{AsyncIndexDef, IndexDef, TableSchema};
32use crate::catalog::types::{Row, Value};
33use crate::catalog::{DdlOperation, ResourceType};
34use crate::checkpoint::loader::load_checkpoint_with_key;
35use crate::checkpoint::writer::write_checkpoint_with_key;
36use crate::commit::executor::{CommitExecutor, CommitResult, ExecutorMetrics};
37use crate::commit::tx::{
38    ReadKey, ReadSet, ReadSetEntry, TransactionEnvelope, WriteClass, WriteIntent,
39};
40use crate::commit::validation::{
41    Mutation, TableUpdateExpr, validate_mutation_with_config, validate_permissions,
42};
43use crate::config::{AedbConfig, DurabilityMode, RecoveryMode};
44use crate::error::AedbError;
45use crate::error::ResourceType as ErrorResourceType;
46use crate::lib_helpers::*;
47use crate::manifest::atomic::write_manifest_atomic_signed;
48use crate::manifest::schema::{Manifest, SegmentMeta};
49use crate::migration::{
50    Migration, MigrationRecord, checksum_hex, decode_record, encode_record, migration_key,
51};
52use crate::order_book::{
53    ExecInstruction, FillSpec, InstrumentConfig, OrderBookDepth, OrderBookTableMode, OrderRecord,
54    OrderRequest, OrderSide, OrderType, Spread, TimeInForce, key_client_id, key_order,
55    read_last_execution_report, read_open_orders, read_order_status, read_recent_trades,
56    read_spread, read_top_n, scoped_instrument, u256_from_be,
57};
58use crate::permission::{CallerContext, Permission};
59use crate::preflight::{PreflightResult, preflight, preflight_plan};
60use crate::query::error::QueryError;
61use crate::query::executor::{QueryResult, execute_query_with_options};
62use crate::query::plan::{ConsistencyMode, Expr, Order, Query, QueryOptions};
63use crate::query::planner::{ExecutionStage, build_physical_plan};
64use crate::query::{KvCursor, KvScanResult, ScopedKvEntry};
65use crate::recovery::replay::replay_segments;
66use crate::recovery::{recover_at_seq_with_config, recover_with_config};
67use crate::snapshot::gc::{SnapshotHandle, SnapshotManager};
68use crate::snapshot::reader::SnapshotReadView;
69use crate::storage::encoded_key::EncodedKey;
70use crate::storage::keyspace::{Keyspace, KvEntry, NamespaceId};
71use crate::wal::frame::{FrameError, FrameReader};
72use crate::wal::segment::{SEGMENT_HEADER_SIZE, SegmentHeader};
73use parking_lot::Mutex;
74use serde::{Deserialize, Serialize};
75use std::collections::{BTreeSet, HashMap, VecDeque};
76use std::fs;
77use std::fs::File;
78use std::io::{BufReader, Read};
79use std::ops::Bound;
80use std::path::{Path, PathBuf};
81use std::sync::Arc;
82use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
83use std::time::{Duration, Instant};
84use tokio::sync::Mutex as AsyncMutex;
85use tracing::{info, warn};
86
87const TRUST_MODE_MARKER_FILE: &str = "trust_mode.json";
88
89#[derive(Debug, Clone, Serialize, Deserialize, Default)]
90struct TrustModeMarker {
91    #[serde(default)]
92    ever_non_strict_recovery: bool,
93    #[serde(default)]
94    ever_hash_chain_disabled: bool,
95}
96
97fn trust_mode_marker_path(dir: &Path) -> PathBuf {
98    dir.join(TRUST_MODE_MARKER_FILE)
99}
100
101fn load_trust_mode_marker(dir: &Path) -> Result<Option<TrustModeMarker>, AedbError> {
102    let path = trust_mode_marker_path(dir);
103    if !path.exists() {
104        return Ok(None);
105    }
106    let bytes = fs::read(&path)?;
107    let marker: TrustModeMarker =
108        serde_json::from_slice(&bytes).map_err(|e| AedbError::Validation(e.to_string()))?;
109    Ok(Some(marker))
110}
111
112fn persist_trust_mode_marker(dir: &Path, marker: &TrustModeMarker) -> Result<(), AedbError> {
113    let bytes = serde_json::to_vec(marker).map_err(|e| AedbError::Encode(e.to_string()))?;
114    fs::write(trust_mode_marker_path(dir), bytes)?;
115    Ok(())
116}
117
118fn enforce_and_record_trust_mode(dir: &Path, config: &AedbConfig) -> Result<(), AedbError> {
119    let mut marker = load_trust_mode_marker(dir)?.unwrap_or_default();
120    if config.strict_recovery()
121        && (marker.ever_non_strict_recovery || marker.ever_hash_chain_disabled)
122    {
123        return Err(AedbError::Validation(
124            "strict open denied: data directory was previously opened with non-strict recovery or hash-chain disabled"
125                .into(),
126        ));
127    }
128
129    let mut changed = false;
130    if !config.strict_recovery() && !marker.ever_non_strict_recovery {
131        marker.ever_non_strict_recovery = true;
132        changed = true;
133    }
134    if !config.hash_chain_required && !marker.ever_hash_chain_disabled {
135        marker.ever_hash_chain_disabled = true;
136        changed = true;
137    }
138    if changed {
139        persist_trust_mode_marker(dir, &marker)?;
140    }
141    Ok(())
142}
143
144/// Creates a directory with restrictive permissions (0o700 on Unix) to prevent
145/// unauthorized access to database files on multi-user systems.
146fn create_private_dir_all(path: &Path) -> Result<(), AedbError> {
147    #[cfg(unix)]
148    {
149        use std::fs::DirBuilder;
150        use std::os::unix::fs::DirBuilderExt;
151        use std::os::unix::fs::PermissionsExt;
152
153        DirBuilder::new()
154            .recursive(true)
155            .mode(0o700) // Owner read/write/execute only
156            .create(path)?;
157        let metadata = fs::metadata(path)?;
158        if !metadata.is_dir() {
159            return Err(AedbError::Validation(format!(
160                "path is not a directory: {}",
161                path.display()
162            )));
163        }
164        let mut perms = metadata.permissions();
165        if perms.mode() != 0o700 {
166            perms.set_mode(0o700);
167            fs::set_permissions(path, perms)?;
168        }
169    }
170    #[cfg(not(unix))]
171    {
172        fs::create_dir_all(path)?;
173    }
174    Ok(())
175}
176
177pub struct AedbInstance {
178    _config: AedbConfig,
179    require_authenticated_calls: bool,
180    dir: PathBuf,
181    executor: CommitExecutor,
182    /// Serializes checkpoint writers without blocking commit/query traffic.
183    checkpoint_lock: Arc<AsyncMutex<()>>,
184    snapshot_manager: Arc<Mutex<SnapshotManager>>,
185    recovery_cache: Arc<Mutex<RecoveryCache>>,
186    lifecycle_hooks: Arc<Mutex<Vec<Arc<dyn LifecycleHook>>>>,
187    telemetry_hooks: Arc<Mutex<Vec<Arc<dyn QueryCommitTelemetryHook>>>>,
188    upstream_validation_rejections: Arc<AtomicU64>,
189    durable_wait_ops: Arc<AtomicU64>,
190    durable_wait_micros: Arc<AtomicU64>,
191    durable_ack_fsync_leader: Arc<AtomicBool>,
192    startup_recovery_micros: u64,
193    startup_recovered_seq: u64,
194}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum CommitFinality {
198    /// Return as soon as commit is visible at the snapshot head.
199    Visible,
200    /// Return only after commit sequence is durable in WAL.
201    Durable,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq)]
205pub struct OperationalMetrics {
206    pub commits_total: u64,
207    pub commit_errors: u64,
208    pub permission_rejections: u64,
209    pub validation_rejections: u64,
210    pub queue_full_rejections: u64,
211    pub timeout_rejections: u64,
212    pub conflict_rejections: u64,
213    pub read_set_conflicts: u64,
214    pub conflict_rate: f64,
215    pub avg_commit_latency_micros: u64,
216    pub coordinator_apply_attempts: u64,
217    pub avg_coordinator_apply_micros: u64,
218    pub wal_append_ops: u64,
219    pub wal_append_bytes: u64,
220    pub avg_wal_append_micros: u64,
221    pub wal_sync_ops: u64,
222    pub avg_wal_sync_micros: u64,
223    pub prestage_validate_ops: u64,
224    pub avg_prestage_validate_micros: u64,
225    pub epoch_process_ops: u64,
226    pub avg_epoch_process_micros: u64,
227    pub durable_wait_ops: u64,
228    pub avg_durable_wait_micros: u64,
229    pub inflight_commits: usize,
230    pub queue_depth: usize,
231    pub durable_head_lag: u64,
232    pub visible_head_seq: u64,
233    pub durable_head_seq: u64,
234    pub current_seq: u64,
235    pub snapshot_age_micros: u64,
236    pub startup_recovery_micros: u64,
237    pub startup_recovered_seq: u64,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct ProjectInfo {
242    pub project_id: String,
243    pub scope_count: u32,
244    pub created_at_micros: u64,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub struct ScopeInfo {
249    pub scope_id: String,
250    pub table_count: u32,
251    pub kv_key_count: u64,
252    pub created_at_micros: u64,
253}
254
255#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct TableInfo {
257    pub table_name: String,
258    pub column_count: u32,
259    pub index_count: u32,
260    pub row_count: u64,
261}
262
263#[derive(Debug, Clone, PartialEq, Eq)]
264pub struct DdlResult {
265    pub applied: bool,
266    pub seq: u64,
267}
268
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct DdlBatchResult {
271    pub seq: u64,
272    pub results: Vec<DdlResult>,
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum DdlBatchOrderMode {
277    AsProvided,
278    DependencyAware,
279}
280
281#[derive(Debug, Clone)]
282pub struct MutateWhereReturningResult {
283    pub commit: CommitResult,
284    pub primary_key: Vec<Value>,
285    pub before: Row,
286    pub after: Row,
287}
288
289#[derive(Debug, Clone, PartialEq, Eq)]
290pub struct MigrationReport {
291    pub applied: Vec<(u64, String, Duration)>,
292    pub skipped: Vec<u64>,
293    pub current_version: u64,
294}
295
296#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
297pub enum LifecycleEvent {
298    ProjectCreated {
299        project_id: String,
300        seq: u64,
301    },
302    ProjectDropped {
303        project_id: String,
304        seq: u64,
305    },
306    ScopeCreated {
307        project_id: String,
308        scope_id: String,
309        seq: u64,
310    },
311    ScopeDropped {
312        project_id: String,
313        scope_id: String,
314        seq: u64,
315    },
316    TableCreated {
317        project_id: String,
318        scope_id: String,
319        table_name: String,
320        seq: u64,
321    },
322    TableDropped {
323        project_id: String,
324        scope_id: String,
325        table_name: String,
326        seq: u64,
327    },
328    TableAltered {
329        project_id: String,
330        scope_id: String,
331        table_name: String,
332        seq: u64,
333    },
334}
335
336pub trait LifecycleHook: Send + Sync {
337    fn on_event(&self, event: &LifecycleEvent);
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct ListPageResult {
342    pub rows: Vec<Row>,
343    pub total_count: usize,
344    pub next_cursor: Option<String>,
345    pub snapshot_seq: u64,
346    pub rows_examined: usize,
347}
348
349#[derive(Debug, Clone)]
350pub struct ListWithTotalRequest {
351    pub project_id: String,
352    pub scope_id: String,
353    pub query: Query,
354    pub cursor: Option<String>,
355    pub offset: Option<usize>,
356    pub page_size: usize,
357    pub consistency: ConsistencyMode,
358}
359
360#[derive(Debug, Clone)]
361pub struct LookupThenHydrateRequest {
362    pub project_id: String,
363    pub scope_id: String,
364    pub source_query: Query,
365    pub source_key_index: usize,
366    pub hydrate_query: Query,
367    pub hydrate_key_column: String,
368    pub consistency: ConsistencyMode,
369}
370
371#[derive(Debug, Clone)]
372pub struct UpdateWhereRequest {
373    pub caller: CallerContext,
374    pub project_id: String,
375    pub scope_id: String,
376    pub table_name: String,
377    pub predicate: Expr,
378    pub updates: Vec<(String, Value)>,
379    pub limit: Option<usize>,
380}
381
382#[derive(Debug, Clone)]
383pub struct UpdateWhereExprRequest {
384    pub caller: CallerContext,
385    pub project_id: String,
386    pub scope_id: String,
387    pub table_name: String,
388    pub predicate: Expr,
389    pub updates: Vec<(String, TableUpdateExpr)>,
390    pub limit: Option<usize>,
391}
392
393#[derive(Debug, Clone)]
394pub struct TableU256MutationRequest {
395    pub caller: CallerContext,
396    pub project_id: String,
397    pub scope_id: String,
398    pub table_name: String,
399    pub primary_key: Vec<Value>,
400    pub column: String,
401    pub amount_be: [u8; 32],
402}
403
404#[derive(Debug, Clone)]
405pub struct CompareAndSwapRequest {
406    pub caller: CallerContext,
407    pub project_id: String,
408    pub scope_id: String,
409    pub table_name: String,
410    pub primary_key: Vec<Value>,
411    pub row: Row,
412    pub expected_seq: u64,
413}
414
415#[derive(Debug, Clone)]
416pub struct QueryBatchItem {
417    pub query: Query,
418    pub options: QueryOptions,
419}
420
421#[derive(Debug, Clone, PartialEq, Eq)]
422pub struct QueryDiagnostics {
423    pub snapshot_seq: u64,
424    pub estimated_scan_rows: u64,
425    pub max_scan_rows: u64,
426    pub index_used: Option<String>,
427    pub selected_indexes: Vec<String>,
428    pub predicate_evaluation_path: PredicateEvaluationPath,
429    pub plan_trace: Vec<String>,
430    pub stages: Vec<ExecutionStage>,
431    pub bounded_by_limit_or_cursor: bool,
432    pub has_joins: bool,
433}
434
435#[derive(Debug, Clone, PartialEq, Eq)]
436pub enum PredicateEvaluationPath {
437    None,
438    PrimaryKeyEqLookup,
439    SecondaryIndexLookup,
440    AsyncIndexProjection,
441    FullScanFilter,
442    JoinExecution,
443}
444
445#[derive(Debug, Clone)]
446pub struct QueryWithDiagnosticsResult {
447    pub result: QueryResult,
448    pub diagnostics: QueryDiagnostics,
449}
450
451#[derive(Debug, Clone)]
452pub struct SqlTransactionPlan {
453    pub base_seq: u64,
454    pub caller: Option<CallerContext>,
455    pub mutations: Vec<Mutation>,
456}
457
458pub trait QueryCommitTelemetryHook: Send + Sync {
459    fn on_query(&self, _event: &QueryTelemetryEvent) {}
460    fn on_commit(&self, _event: &CommitTelemetryEvent) {}
461}
462
463#[derive(Debug, Clone, PartialEq, Eq)]
464pub struct QueryTelemetryEvent {
465    pub project_id: String,
466    pub scope_id: String,
467    pub table: String,
468    pub snapshot_seq: u64,
469    pub rows_examined: usize,
470    pub latency_micros: u64,
471    pub ok: bool,
472    pub error: Option<String>,
473}
474
475#[derive(Debug, Clone, PartialEq, Eq)]
476pub struct CommitTelemetryEvent {
477    pub op: &'static str,
478    pub commit_seq: Option<u64>,
479    pub durable_head_seq: Option<u64>,
480    pub latency_micros: u64,
481    pub ok: bool,
482    pub error: Option<String>,
483}
484
485pub trait ReadOnlySqlAdapter: Send + Sync {
486    fn execute_read_only(
487        &self,
488        project_id: &str,
489        scope_id: &str,
490        sql: &str,
491    ) -> Result<(Query, QueryOptions), QueryError>;
492}
493
494pub trait RemoteBackupAdapter: Send + Sync {
495    fn store_backup_dir(&self, uri: &str, backup_dir: &Path) -> Result<(), AedbError>;
496    fn materialize_backup_chain(
497        &self,
498        uri: &str,
499        scratch_dir: &Path,
500    ) -> Result<Vec<PathBuf>, AedbError>;
501}
502
503struct SnapshotLease {
504    manager: Arc<Mutex<SnapshotManager>>,
505    handle: SnapshotHandle,
506    view: SnapshotReadView,
507}
508
509pub struct ReadTx<'a> {
510    db: &'a AedbInstance,
511    lease: SnapshotLease,
512    caller: Option<CallerContext>,
513}
514
515const RECOVERY_CACHE_CAPACITY: usize = 16;
516const RECOVERY_CACHE_TTL: Duration = Duration::from_secs(60);
517const SYSTEM_CALLER_ID: &str = "system";
518const MAX_MUTATE_WHERE_RETURNING_RETRIES: usize = 16;
519
520#[derive(Debug, Default)]
521struct RecoveryCache {
522    order: VecDeque<(u64, u64)>,
523    entries: HashMap<u64, RecoveryCacheEntry>,
524    next_generation: u64,
525}
526
527#[derive(Debug)]
528struct RecoveryCacheEntry {
529    view: SnapshotReadView,
530    created: Instant,
531    generation: u64,
532}
533
534impl RecoveryCache {
535    fn get(&mut self, seq: u64) -> Option<SnapshotReadView> {
536        self.prune_expired();
537        let generation = self.bump_generation();
538        let view = {
539            let entry = self.entries.get_mut(&seq)?;
540            entry.created = Instant::now();
541            entry.generation = generation;
542            entry.view.clone()
543        };
544        self.order.push_back((seq, generation));
545        self.compact_order_if_needed();
546        Some(view)
547    }
548
549    fn put(&mut self, seq: u64, view: SnapshotReadView) {
550        self.prune_expired();
551        let generation = self.bump_generation();
552        self.entries.insert(
553            seq,
554            RecoveryCacheEntry {
555                view,
556                created: Instant::now(),
557                generation,
558            },
559        );
560        self.order.push_back((seq, generation));
561        self.evict_to_capacity();
562        self.compact_order_if_needed();
563    }
564
565    fn prune_expired(&mut self) {
566        let now = Instant::now();
567        let expired: Vec<u64> = self
568            .entries
569            .iter()
570            .filter_map(|(seq, entry)| {
571                if now.duration_since(entry.created) > RECOVERY_CACHE_TTL {
572                    Some(*seq)
573                } else {
574                    None
575                }
576            })
577            .collect();
578        for seq in expired {
579            self.entries.remove(&seq);
580        }
581        self.compact_order_if_needed();
582    }
583
584    fn bump_generation(&mut self) -> u64 {
585        let generation = self.next_generation;
586        self.next_generation = self.next_generation.wrapping_add(1);
587        generation
588    }
589
590    fn evict_to_capacity(&mut self) {
591        while self.entries.len() > RECOVERY_CACHE_CAPACITY {
592            let Some((seq, generation)) = self.order.pop_front() else {
593                break;
594            };
595            let should_remove = self
596                .entries
597                .get(&seq)
598                .map(|entry| entry.generation == generation)
599                .unwrap_or(false);
600            if should_remove {
601                self.entries.remove(&seq);
602            }
603        }
604    }
605
606    fn compact_order_if_needed(&mut self) {
607        let max_order_len = RECOVERY_CACHE_CAPACITY.saturating_mul(8);
608        if self.order.len() <= max_order_len {
609            return;
610        }
611        let mut live: Vec<(u64, u64)> = self
612            .entries
613            .iter()
614            .map(|(seq, entry)| (*seq, entry.generation))
615            .collect();
616        live.sort_by_key(|(_, generation)| *generation);
617        self.order = live.into_iter().collect();
618    }
619}
620
621impl Drop for SnapshotLease {
622    fn drop(&mut self) {
623        let mut mgr = self.manager.lock();
624        mgr.release(self.handle);
625        let _ = mgr.gc();
626    }
627}
628
629impl AedbInstance {
630    pub fn open_production(config: AedbConfig, dir: &Path) -> Result<Self, AedbError> {
631        validate_arcana_config(&config)?;
632        Self::open_internal(config, dir, true)
633    }
634
635    pub fn open_secure(config: AedbConfig, dir: &Path) -> Result<Self, AedbError> {
636        validate_secure_config(&config)?;
637        Self::open_internal(config, dir, true)
638    }
639
640    pub fn open(config: AedbConfig, dir: &Path) -> Result<Self, AedbError> {
641        Self::open_internal(config, dir, false)
642    }
643
644    fn open_internal(
645        config: AedbConfig,
646        dir: &Path,
647        require_authenticated_calls: bool,
648    ) -> Result<Self, AedbError> {
649        validate_config(&config)?;
650        info!(
651            max_segment_bytes = config.max_segment_bytes,
652            max_segment_age_secs = config.max_segment_age_secs,
653            durability_mode = ?config.durability_mode,
654            batch_interval_ms = config.batch_interval_ms,
655            batch_max_bytes = config.batch_max_bytes,
656            idempotency_window_seconds = config.idempotency_window_seconds,
657            idempotency_window_commits = config.idempotency_window_commits,
658            max_inflight_commits = config.max_inflight_commits,
659            max_commit_queue_bytes = config.max_commit_queue_bytes,
660            max_transaction_bytes = config.max_transaction_bytes,
661            commit_timeout_ms = config.commit_timeout_ms,
662            durable_ack_coalescing_enabled = config.durable_ack_coalescing_enabled,
663            durable_ack_coalesce_window_us = config.durable_ack_coalesce_window_us,
664            max_snapshot_age_ms = config.max_snapshot_age_ms,
665            max_concurrent_snapshots = config.max_concurrent_snapshots,
666            max_scan_rows = config.max_scan_rows,
667            max_kv_key_bytes = config.max_kv_key_bytes,
668            max_kv_value_bytes = config.max_kv_value_bytes,
669            max_memory_estimate_bytes = config.max_memory_estimate_bytes,
670            epoch_max_wait_us = config.epoch_max_wait_us,
671            epoch_min_commits = config.epoch_min_commits,
672            epoch_max_commits = config.epoch_max_commits,
673            adaptive_epoch_enabled = config.adaptive_epoch_enabled,
674            adaptive_epoch_min_commits_floor = config.adaptive_epoch_min_commits_floor,
675            adaptive_epoch_min_commits_ceiling = config.adaptive_epoch_min_commits_ceiling,
676            adaptive_epoch_wait_us_floor = config.adaptive_epoch_wait_us_floor,
677            adaptive_epoch_wait_us_ceiling = config.adaptive_epoch_wait_us_ceiling,
678            adaptive_epoch_target_latency_us = config.adaptive_epoch_target_latency_us,
679            parallel_apply_enabled = config.parallel_apply_enabled,
680            parallel_worker_threads = config.parallel_worker_threads,
681            coordinator_locking_enabled = config.coordinator_locking_enabled,
682            global_unique_index_enabled = config.global_unique_index_enabled,
683            partition_lock_timeout_ms = config.partition_lock_timeout_ms,
684            epoch_apply_timeout_ms = config.epoch_apply_timeout_ms,
685            checkpoint_encryption_enabled = config.checkpoint_encryption_key.is_some(),
686            checkpoint_key_id = config.checkpoint_key_id.as_deref().unwrap_or(""),
687            checkpoint_compression_level = config.checkpoint_compression_level,
688            manifest_hmac_enabled = config.manifest_hmac_key.is_some(),
689            recovery_mode = ?config.recovery_mode,
690            hash_chain_required = config.hash_chain_required,
691            primary_index_backend = ?config.primary_index_backend,
692            "aedb config"
693        );
694        create_private_dir_all(dir)?;
695        enforce_and_record_trust_mode(dir, &config)?;
696        let has_existing = fs::read_dir(dir)?
697            .filter_map(|e| e.ok())
698            .any(|e| e.file_name().to_string_lossy().contains(".aedb"));
699
700        let recovery_start = std::time::SystemTime::now();
701        let (executor, startup_recovered_seq) = if has_existing {
702            let mut recovered = recover_with_config(dir, &config)?;
703            recovered.keyspace.set_backend(config.primary_index_backend);
704            if require_authenticated_calls {
705                seed_system_global_admin(&mut recovered.catalog);
706            }
707            (
708                CommitExecutor::with_state(
709                    dir,
710                    recovered.keyspace,
711                    recovered.catalog,
712                    recovered.current_seq,
713                    recovered.current_seq + 1,
714                    config.clone(),
715                    recovered.idempotency,
716                )?,
717                recovered.current_seq,
718            )
719        } else {
720            let mut catalog = crate::catalog::Catalog::default();
721            if require_authenticated_calls {
722                seed_system_global_admin(&mut catalog);
723            }
724            (
725                CommitExecutor::with_state(
726                    dir,
727                    crate::storage::keyspace::Keyspace::with_backend(config.primary_index_backend),
728                    catalog,
729                    0,
730                    1,
731                    config.clone(),
732                    std::collections::HashMap::new(),
733                )?,
734                0,
735            )
736        };
737        let startup_recovery_micros =
738            recovery_start.elapsed().unwrap_or_default().as_micros() as u64;
739
740        Ok(Self {
741            _config: config,
742            require_authenticated_calls,
743            dir: dir.to_path_buf(),
744            executor,
745            checkpoint_lock: Arc::new(AsyncMutex::new(())),
746            snapshot_manager: Arc::new(Mutex::new(SnapshotManager::default())),
747            recovery_cache: Arc::new(Mutex::new(RecoveryCache::default())),
748            lifecycle_hooks: Arc::new(Mutex::new(Vec::new())),
749            telemetry_hooks: Arc::new(Mutex::new(Vec::new())),
750            upstream_validation_rejections: Arc::new(AtomicU64::new(0)),
751            durable_wait_ops: Arc::new(AtomicU64::new(0)),
752            durable_wait_micros: Arc::new(AtomicU64::new(0)),
753            durable_ack_fsync_leader: Arc::new(AtomicBool::new(false)),
754            startup_recovery_micros,
755            startup_recovered_seq,
756        })
757    }
758
759    pub async fn commit(&self, mutation: Mutation) -> Result<CommitResult, AedbError> {
760        let started = Instant::now();
761        if self.require_authenticated_calls {
762            return Err(AedbError::PermissionDenied(
763                "authenticated caller required; use commit_as in secure mode".into(),
764            ));
765        }
766        // Early size validation to prevent DoS via oversized keys/values
767        crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
768
769        let result = self.executor.submit(mutation).await;
770        self.emit_commit_telemetry("commit", started, &result);
771        let result = result?;
772        self.dispatch_lifecycle_events_for_commit(result.commit_seq)
773            .await;
774        Ok(result)
775    }
776
777    async fn commit_prevalidated_internal(
778        &self,
779        op_name: &'static str,
780        mutation: Mutation,
781    ) -> Result<CommitResult, AedbError> {
782        let started = Instant::now();
783        if self.require_authenticated_calls {
784            return Err(AedbError::PermissionDenied(
785                "authenticated caller required; use commit_as in secure mode".into(),
786            ));
787        }
788        crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
789        let result = self.executor.submit_prevalidated(mutation).await;
790        self.emit_commit_telemetry(op_name, started, &result);
791        result
792    }
793
794    async fn commit_prevalidated_internal_with_finality(
795        &self,
796        op_name: &'static str,
797        mutation: Mutation,
798        finality: CommitFinality,
799    ) -> Result<CommitResult, AedbError> {
800        let mut result = self.commit_prevalidated_internal(op_name, mutation).await?;
801        self.enforce_finality(&mut result, finality).await?;
802        Ok(result)
803    }
804
805    pub async fn commit_with_finality(
806        &self,
807        mutation: Mutation,
808        finality: CommitFinality,
809    ) -> Result<CommitResult, AedbError> {
810        let mut result = self.commit(mutation).await?;
811        self.enforce_finality(&mut result, finality).await?;
812        Ok(result)
813    }
814
815    pub async fn commit_with_preflight(
816        &self,
817        mutation: Mutation,
818    ) -> Result<CommitResult, AedbError> {
819        let started = Instant::now();
820        if self.require_authenticated_calls {
821            return Err(AedbError::PermissionDenied(
822                "authenticated caller required; use commit_as_with_preflight in secure mode".into(),
823            ));
824        }
825        crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
826        let plan = self.preflight_plan(mutation).await;
827        let result = commit_from_preflight_plan(self, None, plan).await;
828        self.emit_commit_telemetry("commit_with_preflight", started, &result);
829        result
830    }
831
832    pub async fn upsert_batch(
833        &self,
834        project_id: &str,
835        scope_id: &str,
836        table_name: &str,
837        rows: Vec<crate::catalog::types::Row>,
838    ) -> Result<CommitResult, AedbError> {
839        self.commit(Mutation::UpsertBatch {
840            project_id: project_id.to_string(),
841            scope_id: scope_id.to_string(),
842            table_name: table_name.to_string(),
843            rows,
844        })
845        .await
846    }
847
848    pub async fn insert(
849        &self,
850        project_id: &str,
851        scope_id: &str,
852        table_name: &str,
853        primary_key: Vec<crate::catalog::types::Value>,
854        row: crate::catalog::types::Row,
855    ) -> Result<CommitResult, AedbError> {
856        self.commit(Mutation::Insert {
857            project_id: project_id.to_string(),
858            scope_id: scope_id.to_string(),
859            table_name: table_name.to_string(),
860            primary_key,
861            row,
862        })
863        .await
864    }
865
866    pub async fn insert_batch(
867        &self,
868        project_id: &str,
869        scope_id: &str,
870        table_name: &str,
871        rows: Vec<crate::catalog::types::Row>,
872    ) -> Result<CommitResult, AedbError> {
873        self.commit(Mutation::InsertBatch {
874            project_id: project_id.to_string(),
875            scope_id: scope_id.to_string(),
876            table_name: table_name.to_string(),
877            rows,
878        })
879        .await
880    }
881
882    pub async fn commit_as(
883        &self,
884        caller: CallerContext,
885        mutation: Mutation,
886    ) -> Result<CommitResult, AedbError> {
887        let started = Instant::now();
888        ensure_external_caller_allowed(&caller)?;
889        // Early size validation to prevent DoS via oversized keys/values
890        crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
891
892        let result = self.executor.submit_as(Some(caller), mutation).await;
893        self.emit_commit_telemetry("commit_as", started, &result);
894        let result = result?;
895        self.dispatch_lifecycle_events_for_commit(result.commit_seq)
896            .await;
897        Ok(result)
898    }
899
900    pub async fn commit_as_with_finality(
901        &self,
902        caller: CallerContext,
903        mutation: Mutation,
904        finality: CommitFinality,
905    ) -> Result<CommitResult, AedbError> {
906        let mut result = self.commit_as(caller, mutation).await?;
907        self.enforce_finality(&mut result, finality).await?;
908        Ok(result)
909    }
910
911    pub async fn commit_as_with_preflight(
912        &self,
913        caller: CallerContext,
914        mutation: Mutation,
915    ) -> Result<CommitResult, AedbError> {
916        let started = Instant::now();
917        ensure_external_caller_allowed(&caller)?;
918        crate::commit::validation::validate_kv_sizes_early(&mutation, &self._config)?;
919        let plan = self.preflight_plan_as(&caller, mutation).await?;
920        let result = commit_from_preflight_plan(self, Some(caller), plan).await;
921        self.emit_commit_telemetry("commit_as_with_preflight", started, &result);
922        result
923    }
924
925    pub async fn commit_envelope(
926        &self,
927        envelope: TransactionEnvelope,
928    ) -> Result<CommitResult, AedbError> {
929        let started = Instant::now();
930        if self.require_authenticated_calls && envelope.caller.is_none() {
931            return Err(AedbError::PermissionDenied(
932                "authenticated caller required; provide envelope.caller in secure mode".into(),
933            ));
934        }
935        if let Some(caller) = envelope.caller.as_ref() {
936            if caller.is_internal_system() {
937                return Err(AedbError::PermissionDenied(
938                    "internal system caller requires trusted commit path".into(),
939                ));
940            }
941            ensure_external_caller_allowed(caller)?;
942        }
943        let result = self.executor.submit_envelope(envelope).await;
944        self.emit_commit_telemetry("commit_envelope", started, &result);
945        let result = result?;
946        self.dispatch_lifecycle_events_for_commit(result.commit_seq)
947            .await;
948        Ok(result)
949    }
950
951    pub async fn commit_envelope_with_finality(
952        &self,
953        envelope: TransactionEnvelope,
954        finality: CommitFinality,
955    ) -> Result<CommitResult, AedbError> {
956        let mut result = self.commit_envelope(envelope).await?;
957        self.enforce_finality(&mut result, finality).await?;
958        Ok(result)
959    }
960
961    async fn enforce_finality(
962        &self,
963        result: &mut CommitResult,
964        finality: CommitFinality,
965    ) -> Result<(), AedbError> {
966        if matches!(finality, CommitFinality::Durable)
967            && result.durable_head_seq < result.commit_seq
968        {
969            let wait_started = Instant::now();
970            if self._config.durable_ack_coalescing_enabled
971                && matches!(self._config.durability_mode, DurabilityMode::Batch)
972            {
973                let window_us = self._config.durable_ack_coalesce_window_us;
974                if window_us > 0 {
975                    tokio::time::sleep(Duration::from_micros(window_us)).await;
976                }
977                if self.executor.durable_head_seq_now() < result.commit_seq {
978                    // Give the periodic batch flusher a chance to satisfy durable
979                    // finality first; only force fsync if it misses this window.
980                    let grace_wait_us = window_us.max(
981                        self._config
982                            .batch_interval_ms
983                            .saturating_mul(1000)
984                            .saturating_mul(2),
985                    );
986                    if grace_wait_us > 0 {
987                        let _ = tokio::time::timeout(
988                            Duration::from_micros(grace_wait_us),
989                            self.wait_for_durable(result.commit_seq),
990                        )
991                        .await;
992                    }
993
994                    if self.executor.durable_head_seq_now() < result.commit_seq {
995                        let recently_synced = grace_wait_us > 0
996                            && self
997                                .executor
998                                .last_wal_sync_age_us()
999                                .is_some_and(|age| age < grace_wait_us);
1000                        if recently_synced {
1001                            let _ = tokio::time::timeout(
1002                                Duration::from_micros(grace_wait_us),
1003                                self.wait_for_durable(result.commit_seq),
1004                            )
1005                            .await;
1006                        }
1007                    }
1008
1009                    if self.executor.durable_head_seq_now() < result.commit_seq {
1010                        if self
1011                            .durable_ack_fsync_leader
1012                            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1013                            .is_ok()
1014                        {
1015                            struct LeaderGuard<'a>(&'a AtomicBool);
1016                            impl Drop for LeaderGuard<'_> {
1017                                fn drop(&mut self) {
1018                                    self.0.store(false, Ordering::Release);
1019                                }
1020                            }
1021                            let _guard = LeaderGuard(&self.durable_ack_fsync_leader);
1022                            let _ = self.force_fsync().await?;
1023                        } else {
1024                            self.wait_for_durable(result.commit_seq).await?;
1025                        }
1026                    }
1027                }
1028            } else {
1029                self.wait_for_durable(result.commit_seq).await?;
1030            }
1031            self.durable_wait_ops.fetch_add(1, Ordering::Relaxed);
1032            self.durable_wait_micros
1033                .fetch_add(wait_started.elapsed().as_micros() as u64, Ordering::Relaxed);
1034            result.durable_head_seq = self.executor.durable_head_seq_now();
1035        }
1036        Ok(())
1037    }
1038
1039    async fn dispatch_lifecycle_events_for_commit(&self, commit_seq: u64) {
1040        if self.lifecycle_hooks.lock().is_empty() {
1041            return;
1042        }
1043        let events = match self.read_lifecycle_events_for_commit(commit_seq).await {
1044            Ok(events) => events,
1045            Err(err) => {
1046                warn!(commit_seq, error = ?err, "failed to read lifecycle outbox");
1047                return;
1048            }
1049        };
1050        self.dispatch_lifecycle_events(events);
1051    }
1052
1053    async fn read_lifecycle_events_for_commit(
1054        &self,
1055        commit_seq: u64,
1056    ) -> Result<Vec<LifecycleEvent>, AedbError> {
1057        let ns = namespace_key(crate::catalog::SYSTEM_PROJECT_ID, "app");
1058        let table_key = (ns.clone(), "lifecycle_outbox".to_string());
1059        let (snapshot, catalog, _) = self.executor.snapshot_state().await;
1060        let Some(schema) = catalog.tables.get(&table_key) else {
1061            return Ok(Vec::new());
1062        };
1063        let Some(events_idx) = schema.columns.iter().position(|c| c.name == "events") else {
1064            return Ok(Vec::new());
1065        };
1066        let Some(table) =
1067            snapshot.table(crate::catalog::SYSTEM_PROJECT_ID, "app", "lifecycle_outbox")
1068        else {
1069            return Ok(Vec::new());
1070        };
1071        let encoded_pk = EncodedKey::from_values(&[Value::Integer(commit_seq as i64)]);
1072        let Some(row) = table.rows.get(&encoded_pk) else {
1073            return Ok(Vec::new());
1074        };
1075        let Some(Value::Json(events_json)) = row.values.get(events_idx) else {
1076            return Ok(Vec::new());
1077        };
1078        serde_json::from_str(events_json.as_str()).map_err(|e| AedbError::Decode(e.to_string()))
1079    }
1080
1081    fn dispatch_lifecycle_events(&self, events: Vec<LifecycleEvent>) {
1082        if events.is_empty() {
1083            return;
1084        }
1085        let hooks = self.lifecycle_hooks.lock().clone();
1086        if hooks.is_empty() {
1087            return;
1088        }
1089        tokio::spawn(async move {
1090            for event in events {
1091                for hook in &hooks {
1092                    if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1093                        hook.on_event(&event)
1094                    }))
1095                    .is_err()
1096                    {
1097                        warn!("lifecycle hook panicked while handling event");
1098                    }
1099                }
1100            }
1101        });
1102    }
1103
1104    fn emit_query_telemetry(
1105        &self,
1106        started: Instant,
1107        project_id: &str,
1108        scope_id: &str,
1109        table: &str,
1110        snapshot_seq: u64,
1111        result: &Result<QueryResult, QueryError>,
1112    ) {
1113        let hooks = self.telemetry_hooks.lock().clone();
1114        if hooks.is_empty() {
1115            return;
1116        }
1117        let (rows_examined, ok, error) = match result {
1118            Ok(res) => (res.rows_examined, true, None),
1119            Err(err) => (0usize, false, Some(err.to_string())),
1120        };
1121        let event = QueryTelemetryEvent {
1122            project_id: project_id.to_string(),
1123            scope_id: scope_id.to_string(),
1124            table: table.to_string(),
1125            snapshot_seq,
1126            rows_examined,
1127            latency_micros: started.elapsed().as_micros() as u64,
1128            ok,
1129            error,
1130        };
1131        for hook in hooks {
1132            hook.on_query(&event);
1133        }
1134    }
1135
1136    fn emit_commit_telemetry(
1137        &self,
1138        op: &'static str,
1139        started: Instant,
1140        result: &Result<CommitResult, AedbError>,
1141    ) {
1142        let hooks = self.telemetry_hooks.lock().clone();
1143        if hooks.is_empty() {
1144            return;
1145        }
1146        let (commit_seq, durable_head_seq, ok, error) = match result {
1147            Ok(res) => (Some(res.commit_seq), Some(res.durable_head_seq), true, None),
1148            Err(err) => (None, None, false, Some(err.to_string())),
1149        };
1150        let event = CommitTelemetryEvent {
1151            op,
1152            commit_seq,
1153            durable_head_seq,
1154            latency_micros: started.elapsed().as_micros() as u64,
1155            ok,
1156            error,
1157        };
1158        for hook in hooks {
1159            hook.on_commit(&event);
1160        }
1161    }
1162
1163    pub async fn query(
1164        &self,
1165        project_id: &str,
1166        scope_id: &str,
1167        query: Query,
1168    ) -> Result<QueryResult, QueryError> {
1169        if self.require_authenticated_calls {
1170            return Err(QueryError::PermissionDenied {
1171                permission: "authenticated caller required in secure mode".into(),
1172                scope: "anonymous".into(),
1173            });
1174        }
1175        self.query_with_options_as(None, project_id, scope_id, query, QueryOptions::default())
1176            .await
1177    }
1178
1179    pub(crate) async fn query_unchecked(
1180        &self,
1181        project_id: &str,
1182        scope_id: &str,
1183        query: Query,
1184        options: QueryOptions,
1185    ) -> Result<QueryResult, QueryError> {
1186        self.query_with_options_as(None, project_id, scope_id, query, options)
1187            .await
1188    }
1189
1190    pub async fn query_no_auth(
1191        &self,
1192        project_id: &str,
1193        scope_id: &str,
1194        query: Query,
1195        options: QueryOptions,
1196    ) -> Result<QueryResult, QueryError> {
1197        if self.require_authenticated_calls {
1198            return Err(QueryError::PermissionDenied {
1199                permission: "query_no_auth is unavailable in secure mode".into(),
1200                scope: "anonymous".into(),
1201            });
1202        }
1203        self.query_unchecked(project_id, scope_id, query, options)
1204            .await
1205    }
1206
1207    pub async fn query_with_options(
1208        &self,
1209        project_id: &str,
1210        scope_id: &str,
1211        query: Query,
1212        options: QueryOptions,
1213    ) -> Result<QueryResult, QueryError> {
1214        if self.require_authenticated_calls {
1215            return Err(QueryError::PermissionDenied {
1216                permission: "authenticated caller required in secure mode".into(),
1217                scope: "anonymous".into(),
1218            });
1219        }
1220        self.query_with_options_as(None, project_id, scope_id, query, options)
1221            .await
1222    }
1223
1224    pub async fn query_with_options_as(
1225        &self,
1226        caller: Option<&CallerContext>,
1227        project_id: &str,
1228        scope_id: &str,
1229        query: Query,
1230        mut options: QueryOptions,
1231    ) -> Result<QueryResult, QueryError> {
1232        if self.require_authenticated_calls && caller.is_none() {
1233            return Err(QueryError::PermissionDenied {
1234                permission: "authenticated caller required in secure mode".into(),
1235                scope: "anonymous".into(),
1236            });
1237        }
1238        if let Some(caller) = caller {
1239            ensure_query_caller_allowed(caller)?;
1240        }
1241
1242        if options.async_index.is_none() {
1243            options.async_index = query.use_index.clone();
1244        }
1245
1246        if let Some(cursor) = &options.cursor {
1247            let token = parse_cursor_seq(cursor).map_err(QueryError::from)?;
1248            options.consistency = ConsistencyMode::AtSeq(token);
1249        }
1250
1251        let lease = self
1252            .acquire_snapshot(options.consistency)
1253            .await
1254            .map_err(QueryError::from)?;
1255        let started = Instant::now();
1256        let table = query.table.clone();
1257        let result = execute_query_against_view(
1258            &lease.view,
1259            project_id,
1260            scope_id,
1261            query,
1262            &options,
1263            caller,
1264            self._config.max_scan_rows,
1265        );
1266        self.emit_query_telemetry(
1267            started,
1268            project_id,
1269            scope_id,
1270            &table,
1271            lease.view.seq,
1272            &result,
1273        );
1274        result
1275    }
1276
1277    pub async fn begin_read_tx(
1278        &self,
1279        consistency: ConsistencyMode,
1280    ) -> Result<ReadTx<'_>, AedbError> {
1281        if self.require_authenticated_calls {
1282            return Err(AedbError::PermissionDenied(
1283                "authenticated caller required in secure mode; use begin_read_tx_as".into(),
1284            ));
1285        }
1286        let lease = self.acquire_snapshot(consistency).await?;
1287        Ok(ReadTx {
1288            db: self,
1289            lease,
1290            caller: None,
1291        })
1292    }
1293
1294    pub async fn begin_read_tx_as(
1295        &self,
1296        caller: CallerContext,
1297        consistency: ConsistencyMode,
1298    ) -> Result<ReadTx<'_>, AedbError> {
1299        ensure_query_caller_allowed(&caller).map_err(|err| match err {
1300            QueryError::PermissionDenied { permission, .. } => {
1301                AedbError::PermissionDenied(permission)
1302            }
1303            other => AedbError::Validation(other.to_string()),
1304        })?;
1305        let lease = self.acquire_snapshot(consistency).await?;
1306        Ok(ReadTx {
1307            db: self,
1308            lease,
1309            caller: Some(caller),
1310        })
1311    }
1312
1313    pub async fn query_page_stable(
1314        &self,
1315        project_id: &str,
1316        scope_id: &str,
1317        mut query: Query,
1318        cursor: Option<String>,
1319        page_size: usize,
1320        consistency: ConsistencyMode,
1321    ) -> Result<QueryResult, QueryError> {
1322        if query.joins.is_empty() && query.order_by.is_empty() {
1323            let (_, catalog, _) = self.executor.snapshot_state().await;
1324            let (q_project, q_scope, q_table) =
1325                resolve_query_table_ref(project_id, scope_id, &query.table);
1326            if let Some(schema) = catalog
1327                .tables
1328                .get(&(namespace_key(&q_project, &q_scope), q_table))
1329            {
1330                for pk in &schema.primary_key {
1331                    query = query.order_by(pk, Order::Asc);
1332                }
1333            }
1334        }
1335        query.limit = Some(page_size.max(1));
1336        self.query_with_options(
1337            project_id,
1338            scope_id,
1339            query,
1340            QueryOptions {
1341                consistency,
1342                cursor,
1343                ..QueryOptions::default()
1344            },
1345        )
1346        .await
1347    }
1348
1349    pub async fn query_batch(
1350        &self,
1351        project_id: &str,
1352        scope_id: &str,
1353        items: Vec<QueryBatchItem>,
1354        consistency: ConsistencyMode,
1355    ) -> Result<Vec<QueryResult>, QueryError> {
1356        let tx = self
1357            .begin_read_tx(consistency)
1358            .await
1359            .map_err(QueryError::from)?;
1360        tx.query_batch(project_id, scope_id, items).await
1361    }
1362
1363    pub async fn query_batch_as(
1364        &self,
1365        caller: CallerContext,
1366        project_id: &str,
1367        scope_id: &str,
1368        items: Vec<QueryBatchItem>,
1369        consistency: ConsistencyMode,
1370    ) -> Result<Vec<QueryResult>, QueryError> {
1371        let tx = self
1372            .begin_read_tx_as(caller, consistency)
1373            .await
1374            .map_err(QueryError::from)?;
1375        tx.query_batch(project_id, scope_id, items).await
1376    }
1377
1378    pub async fn exists(
1379        &self,
1380        project_id: &str,
1381        scope_id: &str,
1382        query: Query,
1383        consistency: ConsistencyMode,
1384    ) -> Result<bool, QueryError> {
1385        let tx = self
1386            .begin_read_tx(consistency)
1387            .await
1388            .map_err(QueryError::from)?;
1389        tx.exists(project_id, scope_id, query).await
1390    }
1391
1392    pub async fn exists_as(
1393        &self,
1394        caller: CallerContext,
1395        project_id: &str,
1396        scope_id: &str,
1397        query: Query,
1398        consistency: ConsistencyMode,
1399    ) -> Result<bool, QueryError> {
1400        let tx = self
1401            .begin_read_tx_as(caller, consistency)
1402            .await
1403            .map_err(QueryError::from)?;
1404        tx.exists(project_id, scope_id, query).await
1405    }
1406
1407    pub async fn explain_query(
1408        &self,
1409        project_id: &str,
1410        scope_id: &str,
1411        query: Query,
1412        options: QueryOptions,
1413    ) -> Result<QueryDiagnostics, QueryError> {
1414        self.explain_query_as(None, project_id, scope_id, query, options)
1415            .await
1416    }
1417
1418    pub async fn explain_query_as(
1419        &self,
1420        caller: Option<&CallerContext>,
1421        project_id: &str,
1422        scope_id: &str,
1423        query: Query,
1424        options: QueryOptions,
1425    ) -> Result<QueryDiagnostics, QueryError> {
1426        if self.require_authenticated_calls && caller.is_none() {
1427            return Err(QueryError::PermissionDenied {
1428                permission: "authenticated caller required in secure mode".into(),
1429                scope: "anonymous".into(),
1430            });
1431        }
1432
1433        let lease = self
1434            .acquire_snapshot(options.consistency)
1435            .await
1436            .map_err(QueryError::from)?;
1437        explain_query_against_view(
1438            &lease.view,
1439            project_id,
1440            scope_id,
1441            query,
1442            &options,
1443            caller,
1444            self._config.max_scan_rows,
1445        )
1446    }
1447
1448    pub async fn query_with_diagnostics(
1449        &self,
1450        project_id: &str,
1451        scope_id: &str,
1452        query: Query,
1453        options: QueryOptions,
1454    ) -> Result<QueryWithDiagnosticsResult, QueryError> {
1455        let diagnostics = self
1456            .explain_query(project_id, scope_id, query.clone(), options.clone())
1457            .await?;
1458        let result = self
1459            .query_with_options(project_id, scope_id, query, options)
1460            .await?;
1461        Ok(QueryWithDiagnosticsResult {
1462            result,
1463            diagnostics,
1464        })
1465    }
1466
1467    pub async fn query_with_diagnostics_as(
1468        &self,
1469        caller: &CallerContext,
1470        project_id: &str,
1471        scope_id: &str,
1472        query: Query,
1473        options: QueryOptions,
1474    ) -> Result<QueryWithDiagnosticsResult, QueryError> {
1475        let diagnostics = self
1476            .explain_query_as(
1477                Some(caller),
1478                project_id,
1479                scope_id,
1480                query.clone(),
1481                options.clone(),
1482            )
1483            .await?;
1484        let result = self
1485            .query_with_options_as(Some(caller), project_id, scope_id, query, options)
1486            .await?;
1487        Ok(QueryWithDiagnosticsResult {
1488            result,
1489            diagnostics,
1490        })
1491    }
1492
1493    pub async fn list_with_total_with(
1494        &self,
1495        request: ListWithTotalRequest,
1496    ) -> Result<ListPageResult, QueryError> {
1497        let tx = self
1498            .begin_read_tx(request.consistency)
1499            .await
1500            .map_err(QueryError::from)?;
1501        tx.list_with_total(
1502            &request.project_id,
1503            &request.scope_id,
1504            request.query,
1505            request.cursor,
1506            request.offset,
1507            request.page_size,
1508        )
1509        .await
1510    }
1511
1512    pub async fn list_with_total_as_with(
1513        &self,
1514        caller: CallerContext,
1515        request: ListWithTotalRequest,
1516    ) -> Result<ListPageResult, QueryError> {
1517        let tx = self
1518            .begin_read_tx_as(caller, request.consistency)
1519            .await
1520            .map_err(QueryError::from)?;
1521        tx.list_with_total(
1522            &request.project_id,
1523            &request.scope_id,
1524            request.query,
1525            request.cursor,
1526            request.offset,
1527            request.page_size,
1528        )
1529        .await
1530    }
1531
1532    #[allow(clippy::too_many_arguments)]
1533    pub async fn list_with_total(
1534        &self,
1535        project_id: &str,
1536        scope_id: &str,
1537        query: Query,
1538        cursor: Option<String>,
1539        offset: Option<usize>,
1540        page_size: usize,
1541        consistency: ConsistencyMode,
1542    ) -> Result<ListPageResult, QueryError> {
1543        self.list_with_total_with(ListWithTotalRequest {
1544            project_id: project_id.to_string(),
1545            scope_id: scope_id.to_string(),
1546            query,
1547            cursor,
1548            offset,
1549            page_size,
1550            consistency,
1551        })
1552        .await
1553    }
1554
1555    #[allow(clippy::too_many_arguments)]
1556    pub async fn list_with_total_as(
1557        &self,
1558        caller: CallerContext,
1559        project_id: &str,
1560        scope_id: &str,
1561        query: Query,
1562        cursor: Option<String>,
1563        offset: Option<usize>,
1564        page_size: usize,
1565        consistency: ConsistencyMode,
1566    ) -> Result<ListPageResult, QueryError> {
1567        self.list_with_total_as_with(
1568            caller,
1569            ListWithTotalRequest {
1570                project_id: project_id.to_string(),
1571                scope_id: scope_id.to_string(),
1572                query,
1573                cursor,
1574                offset,
1575                page_size,
1576                consistency,
1577            },
1578        )
1579        .await
1580    }
1581
1582    pub async fn lookup_then_hydrate_with(
1583        &self,
1584        request: LookupThenHydrateRequest,
1585    ) -> Result<(QueryResult, QueryResult), QueryError> {
1586        let tx = self
1587            .begin_read_tx(request.consistency)
1588            .await
1589            .map_err(QueryError::from)?;
1590        tx.lookup_then_hydrate(
1591            &request.project_id,
1592            &request.scope_id,
1593            request.source_query,
1594            request.source_key_index,
1595            request.hydrate_query,
1596            &request.hydrate_key_column,
1597        )
1598        .await
1599    }
1600
1601    pub async fn lookup_then_hydrate_as_with(
1602        &self,
1603        caller: CallerContext,
1604        request: LookupThenHydrateRequest,
1605    ) -> Result<(QueryResult, QueryResult), QueryError> {
1606        let tx = self
1607            .begin_read_tx_as(caller, request.consistency)
1608            .await
1609            .map_err(QueryError::from)?;
1610        tx.lookup_then_hydrate(
1611            &request.project_id,
1612            &request.scope_id,
1613            request.source_query,
1614            request.source_key_index,
1615            request.hydrate_query,
1616            &request.hydrate_key_column,
1617        )
1618        .await
1619    }
1620
1621    #[allow(clippy::too_many_arguments)]
1622    pub async fn lookup_then_hydrate(
1623        &self,
1624        project_id: &str,
1625        scope_id: &str,
1626        source_query: Query,
1627        source_key_index: usize,
1628        hydrate_query: Query,
1629        hydrate_key_column: &str,
1630        consistency: ConsistencyMode,
1631    ) -> Result<(QueryResult, QueryResult), QueryError> {
1632        self.lookup_then_hydrate_with(LookupThenHydrateRequest {
1633            project_id: project_id.to_string(),
1634            scope_id: scope_id.to_string(),
1635            source_query,
1636            source_key_index,
1637            hydrate_query,
1638            hydrate_key_column: hydrate_key_column.to_string(),
1639            consistency,
1640        })
1641        .await
1642    }
1643
1644    #[allow(clippy::too_many_arguments)]
1645    pub async fn lookup_then_hydrate_as(
1646        &self,
1647        caller: CallerContext,
1648        project_id: &str,
1649        scope_id: &str,
1650        source_query: Query,
1651        source_key_index: usize,
1652        hydrate_query: Query,
1653        hydrate_key_column: &str,
1654        consistency: ConsistencyMode,
1655    ) -> Result<(QueryResult, QueryResult), QueryError> {
1656        self.lookup_then_hydrate_as_with(
1657            caller,
1658            LookupThenHydrateRequest {
1659                project_id: project_id.to_string(),
1660                scope_id: scope_id.to_string(),
1661                source_query,
1662                source_key_index,
1663                hydrate_query,
1664                hydrate_key_column: hydrate_key_column.to_string(),
1665                consistency,
1666            },
1667        )
1668        .await
1669    }
1670
1671    pub async fn query_sql_read_only(
1672        &self,
1673        adapter: &dyn ReadOnlySqlAdapter,
1674        project_id: &str,
1675        scope_id: &str,
1676        sql: &str,
1677        consistency: ConsistencyMode,
1678    ) -> Result<QueryResult, QueryError> {
1679        let (query, mut options) = adapter.execute_read_only(project_id, scope_id, sql)?;
1680        options.consistency = consistency;
1681        options.allow_full_scan = true;
1682        self.query_with_options(project_id, scope_id, query, options)
1683            .await
1684    }
1685
1686    pub async fn query_sql_read_only_as(
1687        &self,
1688        adapter: &dyn ReadOnlySqlAdapter,
1689        caller: &CallerContext,
1690        project_id: &str,
1691        scope_id: &str,
1692        sql: &str,
1693        consistency: ConsistencyMode,
1694    ) -> Result<QueryResult, QueryError> {
1695        let (query, mut options) = adapter.execute_read_only(project_id, scope_id, sql)?;
1696        options.consistency = consistency;
1697        options.allow_full_scan = true;
1698        self.query_with_options_as(Some(caller), project_id, scope_id, query, options)
1699            .await
1700    }
1701
1702    pub async fn plan_sql_transaction(
1703        &self,
1704        consistency: ConsistencyMode,
1705        mutations: Vec<Mutation>,
1706    ) -> Result<SqlTransactionPlan, AedbError> {
1707        if mutations.is_empty() {
1708            return Err(AedbError::Validation(
1709                "sql transaction plan requires at least one mutation".into(),
1710            ));
1711        }
1712        Ok(SqlTransactionPlan {
1713            base_seq: self.snapshot_probe(consistency).await?,
1714            caller: None,
1715            mutations,
1716        })
1717    }
1718
1719    pub async fn plan_sql_transaction_as(
1720        &self,
1721        caller: CallerContext,
1722        consistency: ConsistencyMode,
1723        mutations: Vec<Mutation>,
1724    ) -> Result<SqlTransactionPlan, AedbError> {
1725        if mutations.is_empty() {
1726            return Err(AedbError::Validation(
1727                "sql transaction plan requires at least one mutation".into(),
1728            ));
1729        }
1730        ensure_external_caller_allowed(&caller)?;
1731        Ok(SqlTransactionPlan {
1732            base_seq: self.snapshot_probe(consistency).await?,
1733            caller: Some(caller),
1734            mutations,
1735        })
1736    }
1737
1738    pub async fn commit_sql_transaction_plan(
1739        &self,
1740        plan: SqlTransactionPlan,
1741    ) -> Result<CommitResult, AedbError> {
1742        self.commit_envelope(TransactionEnvelope {
1743            caller: plan.caller,
1744            idempotency_key: None,
1745            write_class: WriteClass::Standard,
1746            assertions: Vec::new(),
1747            read_set: ReadSet::default(),
1748            write_intent: WriteIntent {
1749                mutations: plan.mutations,
1750            },
1751            base_seq: plan.base_seq,
1752        })
1753        .await
1754    }
1755
1756    pub async fn commit_sql_transaction(
1757        &self,
1758        consistency: ConsistencyMode,
1759        mutations: Vec<Mutation>,
1760    ) -> Result<CommitResult, AedbError> {
1761        let plan = self.plan_sql_transaction(consistency, mutations).await?;
1762        self.commit_sql_transaction_plan(plan).await
1763    }
1764
1765    pub async fn commit_sql_transaction_as(
1766        &self,
1767        caller: CallerContext,
1768        consistency: ConsistencyMode,
1769        mutations: Vec<Mutation>,
1770    ) -> Result<CommitResult, AedbError> {
1771        let plan = self
1772            .plan_sql_transaction_as(caller, consistency, mutations)
1773            .await?;
1774        self.commit_sql_transaction_plan(plan).await
1775    }
1776
1777    pub async fn commit_many_atomic(
1778        &self,
1779        mutations: Vec<Mutation>,
1780    ) -> Result<CommitResult, AedbError> {
1781        if mutations.is_empty() {
1782            return Err(AedbError::Validation(
1783                "transaction envelope has no mutations".into(),
1784            ));
1785        }
1786        self.commit_envelope(TransactionEnvelope {
1787            caller: None,
1788            idempotency_key: None,
1789            write_class: WriteClass::Standard,
1790            assertions: Vec::new(),
1791            read_set: ReadSet::default(),
1792            write_intent: WriteIntent { mutations },
1793            // No read set/assertions in this helper path.
1794            // Keep hot-path parity with submit/submit_as and avoid snapshot acquisition.
1795            base_seq: 0,
1796        })
1797        .await
1798    }
1799
1800    pub async fn commit_many_atomic_as(
1801        &self,
1802        caller: CallerContext,
1803        mutations: Vec<Mutation>,
1804    ) -> Result<CommitResult, AedbError> {
1805        if mutations.is_empty() {
1806            return Err(AedbError::Validation(
1807                "transaction envelope has no mutations".into(),
1808            ));
1809        }
1810        self.commit_envelope(TransactionEnvelope {
1811            caller: Some(caller),
1812            idempotency_key: None,
1813            write_class: WriteClass::Standard,
1814            assertions: Vec::new(),
1815            read_set: ReadSet::default(),
1816            write_intent: WriteIntent { mutations },
1817            // No read set/assertions in this helper path.
1818            // Keep hot-path parity with submit/submit_as and avoid snapshot acquisition.
1819            base_seq: 0,
1820        })
1821        .await
1822    }
1823
1824    pub async fn delete_where(
1825        &self,
1826        project_id: &str,
1827        scope_id: &str,
1828        table_name: &str,
1829        predicate: Expr,
1830        limit: Option<usize>,
1831    ) -> Result<Option<CommitResult>, AedbError> {
1832        let result = self
1833            .commit(Mutation::DeleteWhere {
1834                project_id: project_id.to_string(),
1835                scope_id: scope_id.to_string(),
1836                table_name: table_name.to_string(),
1837                predicate,
1838                limit,
1839            })
1840            .await?;
1841        Ok(Some(result))
1842    }
1843
1844    pub async fn delete_where_as(
1845        &self,
1846        caller: CallerContext,
1847        project_id: &str,
1848        scope_id: &str,
1849        table_name: &str,
1850        predicate: Expr,
1851        limit: Option<usize>,
1852    ) -> Result<Option<CommitResult>, AedbError> {
1853        let result = self
1854            .commit_as(
1855                caller,
1856                Mutation::DeleteWhere {
1857                    project_id: project_id.to_string(),
1858                    scope_id: scope_id.to_string(),
1859                    table_name: table_name.to_string(),
1860                    predicate,
1861                    limit,
1862                },
1863            )
1864            .await?;
1865        Ok(Some(result))
1866    }
1867
1868    pub async fn update_where(
1869        &self,
1870        project_id: &str,
1871        scope_id: &str,
1872        table_name: &str,
1873        predicate: Expr,
1874        updates: Vec<(String, Value)>,
1875        limit: Option<usize>,
1876    ) -> Result<Option<CommitResult>, AedbError> {
1877        let result = self
1878            .commit(Mutation::UpdateWhere {
1879                project_id: project_id.to_string(),
1880                scope_id: scope_id.to_string(),
1881                table_name: table_name.to_string(),
1882                predicate,
1883                updates,
1884                limit,
1885            })
1886            .await?;
1887        Ok(Some(result))
1888    }
1889
1890    pub async fn update_where_as_with(
1891        &self,
1892        request: UpdateWhereRequest,
1893    ) -> Result<Option<CommitResult>, AedbError> {
1894        let result = self
1895            .commit_as(
1896                request.caller,
1897                Mutation::UpdateWhere {
1898                    project_id: request.project_id,
1899                    scope_id: request.scope_id,
1900                    table_name: request.table_name,
1901                    predicate: request.predicate,
1902                    updates: request.updates,
1903                    limit: request.limit,
1904                },
1905            )
1906            .await?;
1907        Ok(Some(result))
1908    }
1909
1910    #[allow(clippy::too_many_arguments)]
1911    pub async fn update_where_as(
1912        &self,
1913        caller: CallerContext,
1914        project_id: &str,
1915        scope_id: &str,
1916        table_name: &str,
1917        predicate: Expr,
1918        updates: Vec<(String, Value)>,
1919        limit: Option<usize>,
1920    ) -> Result<Option<CommitResult>, AedbError> {
1921        self.update_where_as_with(UpdateWhereRequest {
1922            caller,
1923            project_id: project_id.to_string(),
1924            scope_id: scope_id.to_string(),
1925            table_name: table_name.to_string(),
1926            predicate,
1927            updates,
1928            limit,
1929        })
1930        .await
1931    }
1932
1933    pub async fn update_where_expr(
1934        &self,
1935        project_id: &str,
1936        scope_id: &str,
1937        table_name: &str,
1938        predicate: Expr,
1939        updates: Vec<(String, TableUpdateExpr)>,
1940        limit: Option<usize>,
1941    ) -> Result<Option<CommitResult>, AedbError> {
1942        let result = self
1943            .commit(Mutation::UpdateWhereExpr {
1944                project_id: project_id.to_string(),
1945                scope_id: scope_id.to_string(),
1946                table_name: table_name.to_string(),
1947                predicate,
1948                updates,
1949                limit,
1950            })
1951            .await?;
1952        Ok(Some(result))
1953    }
1954
1955    pub async fn update_where_expr_as_with(
1956        &self,
1957        request: UpdateWhereExprRequest,
1958    ) -> Result<Option<CommitResult>, AedbError> {
1959        let result = self
1960            .commit_as(
1961                request.caller,
1962                Mutation::UpdateWhereExpr {
1963                    project_id: request.project_id,
1964                    scope_id: request.scope_id,
1965                    table_name: request.table_name,
1966                    predicate: request.predicate,
1967                    updates: request.updates,
1968                    limit: request.limit,
1969                },
1970            )
1971            .await?;
1972        Ok(Some(result))
1973    }
1974
1975    #[allow(clippy::too_many_arguments)]
1976    pub async fn update_where_expr_as(
1977        &self,
1978        caller: CallerContext,
1979        project_id: &str,
1980        scope_id: &str,
1981        table_name: &str,
1982        predicate: Expr,
1983        updates: Vec<(String, TableUpdateExpr)>,
1984        limit: Option<usize>,
1985    ) -> Result<Option<CommitResult>, AedbError> {
1986        self.update_where_expr_as_with(UpdateWhereExprRequest {
1987            caller,
1988            project_id: project_id.to_string(),
1989            scope_id: scope_id.to_string(),
1990            table_name: table_name.to_string(),
1991            predicate,
1992            updates,
1993            limit,
1994        })
1995        .await
1996    }
1997
1998    pub async fn mutate_where_returning(
1999        &self,
2000        project_id: &str,
2001        scope_id: &str,
2002        table_name: &str,
2003        predicate: Expr,
2004        updates: Vec<(String, TableUpdateExpr)>,
2005    ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
2006        self.mutate_where_returning_inner(
2007            None, project_id, scope_id, table_name, predicate, updates,
2008        )
2009        .await
2010    }
2011
2012    pub async fn mutate_where_returning_as(
2013        &self,
2014        caller: CallerContext,
2015        project_id: &str,
2016        scope_id: &str,
2017        table_name: &str,
2018        predicate: Expr,
2019        updates: Vec<(String, TableUpdateExpr)>,
2020    ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
2021        self.mutate_where_returning_inner(
2022            Some(caller),
2023            project_id,
2024            scope_id,
2025            table_name,
2026            predicate,
2027            updates,
2028        )
2029        .await
2030    }
2031
2032    pub async fn claim_one(
2033        &self,
2034        project_id: &str,
2035        scope_id: &str,
2036        table_name: &str,
2037        predicate: Expr,
2038        updates: Vec<(String, TableUpdateExpr)>,
2039    ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
2040        self.mutate_where_returning(project_id, scope_id, table_name, predicate, updates)
2041            .await
2042    }
2043
2044    pub async fn claim_one_as(
2045        &self,
2046        caller: CallerContext,
2047        project_id: &str,
2048        scope_id: &str,
2049        table_name: &str,
2050        predicate: Expr,
2051        updates: Vec<(String, TableUpdateExpr)>,
2052    ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
2053        self.mutate_where_returning_as(caller, project_id, scope_id, table_name, predicate, updates)
2054            .await
2055    }
2056
2057    pub async fn kv_get(
2058        &self,
2059        project_id: &str,
2060        scope_id: &str,
2061        key: &[u8],
2062        consistency: ConsistencyMode,
2063        caller: &CallerContext,
2064    ) -> Result<Option<KvEntry>, QueryError> {
2065        ensure_query_caller_allowed(caller)?;
2066        let lease = self
2067            .acquire_snapshot(consistency)
2068            .await
2069            .map_err(QueryError::from)?;
2070        let snapshot = &lease.view.keyspace;
2071        let catalog = &lease.view.catalog;
2072        if !catalog.has_kv_read_permission(&caller.caller_id, project_id, scope_id, key) {
2073            return Err(QueryError::PermissionDenied {
2074                permission: format!("KvRead({project_id}.{scope_id})"),
2075                scope: caller.caller_id.clone(),
2076            });
2077        }
2078        Ok(snapshot.kv_get(project_id, scope_id, key).cloned())
2079    }
2080
2081    pub async fn kv_get_default_scope(
2082        &self,
2083        project_id: &str,
2084        key: &[u8],
2085        consistency: ConsistencyMode,
2086        caller: &CallerContext,
2087    ) -> Result<Option<KvEntry>, QueryError> {
2088        self.kv_get(
2089            project_id,
2090            crate::catalog::DEFAULT_SCOPE_ID,
2091            key,
2092            consistency,
2093            caller,
2094        )
2095        .await
2096    }
2097
2098    pub(crate) async fn kv_get_unchecked(
2099        &self,
2100        project_id: &str,
2101        scope_id: &str,
2102        key: &[u8],
2103        consistency: ConsistencyMode,
2104    ) -> Result<Option<KvEntry>, QueryError> {
2105        let lease = self
2106            .acquire_snapshot(consistency)
2107            .await
2108            .map_err(QueryError::from)?;
2109        Ok(lease
2110            .view
2111            .keyspace
2112            .kv_get(project_id, scope_id, key)
2113            .cloned())
2114    }
2115
2116    pub async fn kv_get_no_auth(
2117        &self,
2118        project_id: &str,
2119        scope_id: &str,
2120        key: &[u8],
2121        consistency: ConsistencyMode,
2122    ) -> Result<Option<KvEntry>, QueryError> {
2123        if self.require_authenticated_calls {
2124            return Err(QueryError::PermissionDenied {
2125                permission: "kv_get_no_auth is unavailable in secure mode".into(),
2126                scope: "anonymous".into(),
2127            });
2128        }
2129        self.kv_get_unchecked(project_id, scope_id, key, consistency)
2130            .await
2131    }
2132
2133    pub(crate) async fn kv_scan_prefix_unchecked(
2134        &self,
2135        project_id: &str,
2136        scope_id: &str,
2137        prefix: &[u8],
2138        limit: u64,
2139        consistency: ConsistencyMode,
2140    ) -> Result<Vec<(Vec<u8>, KvEntry)>, QueryError> {
2141        let lease = self
2142            .acquire_snapshot(consistency)
2143            .await
2144            .map_err(QueryError::from)?;
2145        let page_size = limit.min(self._config.max_scan_rows as u64) as usize;
2146        let start_bound = Bound::Included(prefix.to_vec());
2147        let end_bound = next_prefix_bytes(prefix).map_or(Bound::Unbounded, Bound::Excluded);
2148        let ns = NamespaceId::project_scope(project_id, scope_id);
2149        let mut entries = Vec::new();
2150        if let Some(kv) = lease.view.keyspace.namespaces.get(&ns).map(|n| &n.kv) {
2151            for (k, v) in kv.entries.range((start_bound, end_bound)) {
2152                if !k.starts_with(prefix) {
2153                    break;
2154                }
2155                entries.push((k.clone(), v.clone()));
2156                if entries.len() == page_size {
2157                    break;
2158                }
2159            }
2160        }
2161        Ok(entries)
2162    }
2163
2164    pub async fn kv_scan_prefix_no_auth(
2165        &self,
2166        project_id: &str,
2167        scope_id: &str,
2168        prefix: &[u8],
2169        limit: u64,
2170        consistency: ConsistencyMode,
2171    ) -> Result<Vec<(Vec<u8>, KvEntry)>, QueryError> {
2172        if self.require_authenticated_calls {
2173            return Err(QueryError::PermissionDenied {
2174                permission: "kv_scan_prefix_no_auth is unavailable in secure mode".into(),
2175                scope: "anonymous".into(),
2176            });
2177        }
2178        self.kv_scan_prefix_unchecked(project_id, scope_id, prefix, limit, consistency)
2179            .await
2180    }
2181
2182    #[allow(clippy::too_many_arguments)]
2183    pub async fn kv_scan_prefix(
2184        &self,
2185        project_id: &str,
2186        scope_id: &str,
2187        prefix: &[u8],
2188        limit: u64,
2189        cursor: Option<KvCursor>,
2190        consistency: ConsistencyMode,
2191        caller: &CallerContext,
2192    ) -> Result<KvScanResult, QueryError> {
2193        ensure_query_caller_allowed(caller)?;
2194        let effective_consistency = if let Some(c) = &cursor {
2195            ConsistencyMode::AtSeq(c.snapshot_seq)
2196        } else {
2197            consistency
2198        };
2199        let lease = self
2200            .acquire_snapshot(effective_consistency)
2201            .await
2202            .map_err(QueryError::from)?;
2203        let snapshot = &lease.view.keyspace;
2204        let catalog = &lease.view.catalog;
2205        let snapshot_seq = lease.view.seq;
2206        let allowed_prefixes =
2207            match catalog.kv_read_prefixes_for_caller(&caller.caller_id, project_id, scope_id) {
2208                Some(prefixes) => prefixes,
2209                None => {
2210                    return Err(QueryError::PermissionDenied {
2211                        permission: format!("KvRead({project_id}.{scope_id})"),
2212                        scope: caller.caller_id.clone(),
2213                    });
2214                }
2215            };
2216        if let Some(c) = &cursor
2217            && c.snapshot_seq != snapshot_seq
2218        {
2219            return Err(QueryError::InvalidQuery {
2220                reason: "cursor snapshot_seq mismatch".into(),
2221            });
2222        }
2223        let page_size = limit.min(self._config.max_scan_rows as u64) as usize;
2224        let start_bound = cursor
2225            .as_ref()
2226            .map_or(Bound::Included(prefix.to_vec()), |c| {
2227                Bound::Excluded(c.last_key.clone())
2228            });
2229        let end_bound = next_prefix_bytes(prefix).map_or(Bound::Unbounded, Bound::Excluded);
2230
2231        let ns = NamespaceId::project_scope(project_id, scope_id);
2232        let mut entries = Vec::new();
2233        if let Some(kv) = snapshot.namespaces.get(&ns).map(|n| &n.kv) {
2234            for (k, v) in kv.entries.range((start_bound, end_bound)) {
2235                if !k.starts_with(prefix) {
2236                    break;
2237                }
2238                if !allowed_prefixes.is_empty()
2239                    && !allowed_prefixes
2240                        .iter()
2241                        .any(|allowed| k.starts_with(allowed))
2242                {
2243                    continue;
2244                }
2245                entries.push((k.clone(), v.clone()));
2246                if entries.len() > page_size {
2247                    break;
2248                }
2249            }
2250        }
2251
2252        let truncated = entries.len() > page_size;
2253        if truncated {
2254            entries.truncate(page_size);
2255        }
2256        let next_cursor = if truncated {
2257            entries.last().map(|(k, _)| KvCursor {
2258                snapshot_seq,
2259                last_key: k.clone(),
2260                page_size: page_size as u64,
2261            })
2262        } else {
2263            None
2264        };
2265        Ok(KvScanResult {
2266            entries,
2267            cursor: next_cursor,
2268            snapshot_seq,
2269            truncated,
2270        })
2271    }
2272
2273    pub async fn kv_scan_prefix_default_scope(
2274        &self,
2275        project_id: &str,
2276        prefix: &[u8],
2277        limit: u64,
2278        cursor: Option<KvCursor>,
2279        consistency: ConsistencyMode,
2280        caller: &CallerContext,
2281    ) -> Result<KvScanResult, QueryError> {
2282        self.kv_scan_prefix(
2283            project_id,
2284            crate::catalog::DEFAULT_SCOPE_ID,
2285            prefix,
2286            limit,
2287            cursor,
2288            consistency,
2289            caller,
2290        )
2291        .await
2292    }
2293
2294    #[allow(clippy::too_many_arguments)]
2295    pub async fn kv_scan_range(
2296        &self,
2297        project_id: &str,
2298        scope_id: &str,
2299        start: Bound<Vec<u8>>,
2300        end: Bound<Vec<u8>>,
2301        limit: u64,
2302        cursor: Option<KvCursor>,
2303        consistency: ConsistencyMode,
2304        caller: &CallerContext,
2305    ) -> Result<KvScanResult, QueryError> {
2306        ensure_query_caller_allowed(caller)?;
2307        let effective_consistency = if let Some(c) = &cursor {
2308            ConsistencyMode::AtSeq(c.snapshot_seq)
2309        } else {
2310            consistency
2311        };
2312        let lease = self
2313            .acquire_snapshot(effective_consistency)
2314            .await
2315            .map_err(QueryError::from)?;
2316        let snapshot = &lease.view.keyspace;
2317        let catalog = &lease.view.catalog;
2318        let snapshot_seq = lease.view.seq;
2319        let allowed_prefixes =
2320            match catalog.kv_read_prefixes_for_caller(&caller.caller_id, project_id, scope_id) {
2321                Some(prefixes) => prefixes,
2322                None => {
2323                    return Err(QueryError::PermissionDenied {
2324                        permission: format!("KvRead({project_id}.{scope_id})"),
2325                        scope: caller.caller_id.clone(),
2326                    });
2327                }
2328            };
2329        if let Some(c) = &cursor
2330            && c.snapshot_seq != snapshot_seq
2331        {
2332            return Err(QueryError::InvalidQuery {
2333                reason: "cursor snapshot_seq mismatch".into(),
2334            });
2335        }
2336        let page_size = limit.min(self._config.max_scan_rows as u64) as usize;
2337        let adjusted_start = match (&cursor, start) {
2338            (Some(c), _) => Bound::Excluded(c.last_key.clone()),
2339            (None, b) => b,
2340        };
2341
2342        let ns = NamespaceId::project_scope(project_id, scope_id);
2343        let mut entries = Vec::new();
2344        if let Some(kv) = snapshot.namespaces.get(&ns).map(|n| &n.kv) {
2345            for (k, v) in kv.entries.range((adjusted_start, end)) {
2346                if !allowed_prefixes.is_empty()
2347                    && !allowed_prefixes
2348                        .iter()
2349                        .any(|allowed| k.starts_with(allowed))
2350                {
2351                    continue;
2352                }
2353                entries.push((k.clone(), v.clone()));
2354                if entries.len() > page_size {
2355                    break;
2356                }
2357            }
2358        }
2359
2360        let truncated = entries.len() > page_size;
2361        if truncated {
2362            entries.truncate(page_size);
2363        }
2364        let next_cursor = if truncated {
2365            entries.last().map(|(k, _)| KvCursor {
2366                snapshot_seq,
2367                last_key: k.clone(),
2368                page_size: page_size as u64,
2369            })
2370        } else {
2371            None
2372        };
2373        Ok(KvScanResult {
2374            entries,
2375            cursor: next_cursor,
2376            snapshot_seq,
2377            truncated,
2378        })
2379    }
2380
2381    pub async fn kv_scan_all_scopes(
2382        &self,
2383        project_id: &str,
2384        prefix: &[u8],
2385        limit: u64,
2386        consistency: ConsistencyMode,
2387        caller: &CallerContext,
2388    ) -> Result<Vec<ScopedKvEntry>, QueryError> {
2389        ensure_query_caller_allowed(caller)?;
2390        let (_, catalog, _) = self.executor.snapshot_state().await;
2391        let project_read = catalog.has_permission(
2392            &caller.caller_id,
2393            &Permission::KvRead {
2394                project_id: project_id.to_string(),
2395                scope_id: None,
2396                prefix: None,
2397            },
2398        ) || catalog.has_permission(
2399            &caller.caller_id,
2400            &Permission::ProjectAdmin {
2401                project_id: project_id.to_string(),
2402            },
2403        );
2404        if !project_read {
2405            return Err(QueryError::PermissionDenied {
2406                permission: format!("KvRead({project_id}.*)"),
2407                scope: caller.caller_id.clone(),
2408            });
2409        }
2410        let lease = self
2411            .acquire_snapshot(consistency)
2412            .await
2413            .map_err(QueryError::from)?;
2414        let snapshot = &lease.view.keyspace;
2415        let mut out = Vec::new();
2416        for (ns_id, ns) in snapshot.namespaces.iter() {
2417            let Some(ns_key) = ns_id.as_project_scope_key() else {
2418                continue;
2419            };
2420            let Some((p, scope)) = ns_key.split_once("::") else {
2421                continue;
2422            };
2423            if p != project_id {
2424                continue;
2425            }
2426            for (k, v) in ns.kv.entries.iter() {
2427                if !k.starts_with(prefix) {
2428                    continue;
2429                }
2430                out.push(ScopedKvEntry {
2431                    scope_id: scope.to_string(),
2432                    key: k.clone(),
2433                    value: v.value.clone(),
2434                    version: v.version,
2435                });
2436                if out.len() >= limit as usize {
2437                    return Ok(out);
2438                }
2439            }
2440        }
2441        Ok(out)
2442    }
2443
2444    pub async fn kv_set(
2445        &self,
2446        project_id: &str,
2447        scope_id: &str,
2448        key: Vec<u8>,
2449        value: Vec<u8>,
2450    ) -> Result<CommitResult, AedbError> {
2451        self.commit(Mutation::KvSet {
2452            project_id: project_id.to_string(),
2453            scope_id: scope_id.to_string(),
2454            key,
2455            value,
2456        })
2457        .await
2458    }
2459
2460    pub async fn kv_set_default_scope(
2461        &self,
2462        project_id: &str,
2463        key: Vec<u8>,
2464        value: Vec<u8>,
2465    ) -> Result<CommitResult, AedbError> {
2466        self.kv_set(project_id, crate::catalog::DEFAULT_SCOPE_ID, key, value)
2467            .await
2468    }
2469
2470    pub async fn kv_set_as(
2471        &self,
2472        caller: CallerContext,
2473        project_id: &str,
2474        scope_id: &str,
2475        key: Vec<u8>,
2476        value: Vec<u8>,
2477    ) -> Result<CommitResult, AedbError> {
2478        self.commit_as(
2479            caller,
2480            Mutation::KvSet {
2481                project_id: project_id.to_string(),
2482                scope_id: scope_id.to_string(),
2483                key,
2484                value,
2485            },
2486        )
2487        .await
2488    }
2489
2490    pub async fn kv_del(
2491        &self,
2492        project_id: &str,
2493        scope_id: &str,
2494        key: Vec<u8>,
2495    ) -> Result<CommitResult, AedbError> {
2496        self.commit(Mutation::KvDel {
2497            project_id: project_id.to_string(),
2498            scope_id: scope_id.to_string(),
2499            key,
2500        })
2501        .await
2502    }
2503
2504    pub async fn kv_del_default_scope(
2505        &self,
2506        project_id: &str,
2507        key: Vec<u8>,
2508    ) -> Result<CommitResult, AedbError> {
2509        self.kv_del(project_id, crate::catalog::DEFAULT_SCOPE_ID, key)
2510            .await
2511    }
2512
2513    pub async fn kv_del_as(
2514        &self,
2515        caller: CallerContext,
2516        project_id: &str,
2517        scope_id: &str,
2518        key: Vec<u8>,
2519    ) -> Result<CommitResult, AedbError> {
2520        self.commit_as(
2521            caller,
2522            Mutation::KvDel {
2523                project_id: project_id.to_string(),
2524                scope_id: scope_id.to_string(),
2525                key,
2526            },
2527        )
2528        .await
2529    }
2530
2531    pub async fn kv_inc_u256(
2532        &self,
2533        project_id: &str,
2534        scope_id: &str,
2535        key: Vec<u8>,
2536        amount_be: [u8; 32],
2537    ) -> Result<CommitResult, AedbError> {
2538        self.commit(Mutation::KvIncU256 {
2539            project_id: project_id.to_string(),
2540            scope_id: scope_id.to_string(),
2541            key,
2542            amount_be,
2543        })
2544        .await
2545    }
2546
2547    pub async fn kv_inc_u256_as(
2548        &self,
2549        caller: CallerContext,
2550        project_id: &str,
2551        scope_id: &str,
2552        key: Vec<u8>,
2553        amount_be: [u8; 32],
2554    ) -> Result<CommitResult, AedbError> {
2555        self.commit_as(
2556            caller,
2557            Mutation::KvIncU256 {
2558                project_id: project_id.to_string(),
2559                scope_id: scope_id.to_string(),
2560                key,
2561                amount_be,
2562            },
2563        )
2564        .await
2565    }
2566
2567    pub async fn kv_dec_u256(
2568        &self,
2569        project_id: &str,
2570        scope_id: &str,
2571        key: Vec<u8>,
2572        amount_be: [u8; 32],
2573    ) -> Result<CommitResult, AedbError> {
2574        self.commit(Mutation::KvDecU256 {
2575            project_id: project_id.to_string(),
2576            scope_id: scope_id.to_string(),
2577            key,
2578            amount_be,
2579        })
2580        .await
2581    }
2582
2583    pub async fn kv_dec_u256_as(
2584        &self,
2585        caller: CallerContext,
2586        project_id: &str,
2587        scope_id: &str,
2588        key: Vec<u8>,
2589        amount_be: [u8; 32],
2590    ) -> Result<CommitResult, AedbError> {
2591        self.commit_as(
2592            caller,
2593            Mutation::KvDecU256 {
2594                project_id: project_id.to_string(),
2595                scope_id: scope_id.to_string(),
2596                key,
2597                amount_be,
2598            },
2599        )
2600        .await
2601    }
2602
2603    pub async fn kv_compare_and_swap(
2604        &self,
2605        project_id: &str,
2606        scope_id: &str,
2607        key: Vec<u8>,
2608        value: Vec<u8>,
2609        expected_seq: u64,
2610    ) -> Result<CommitResult, AedbError> {
2611        self.commit_envelope(TransactionEnvelope {
2612            caller: None,
2613            idempotency_key: None,
2614            write_class: crate::commit::tx::WriteClass::Standard,
2615            assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2616                project_id: project_id.to_string(),
2617                scope_id: scope_id.to_string(),
2618                key: key.clone(),
2619                expected_seq,
2620            }],
2621            read_set: crate::commit::tx::ReadSet::default(),
2622            write_intent: crate::commit::tx::WriteIntent {
2623                mutations: vec![Mutation::KvSet {
2624                    project_id: project_id.to_string(),
2625                    scope_id: scope_id.to_string(),
2626                    key,
2627                    value,
2628                }],
2629            },
2630            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2631        })
2632        .await
2633    }
2634
2635    pub async fn kv_compare_and_swap_as(
2636        &self,
2637        caller: CallerContext,
2638        project_id: &str,
2639        scope_id: &str,
2640        key: Vec<u8>,
2641        value: Vec<u8>,
2642        expected_seq: u64,
2643    ) -> Result<CommitResult, AedbError> {
2644        self.commit_envelope(TransactionEnvelope {
2645            caller: Some(caller),
2646            idempotency_key: None,
2647            write_class: crate::commit::tx::WriteClass::Standard,
2648            assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2649                project_id: project_id.to_string(),
2650                scope_id: scope_id.to_string(),
2651                key: key.clone(),
2652                expected_seq,
2653            }],
2654            read_set: crate::commit::tx::ReadSet::default(),
2655            write_intent: crate::commit::tx::WriteIntent {
2656                mutations: vec![Mutation::KvSet {
2657                    project_id: project_id.to_string(),
2658                    scope_id: scope_id.to_string(),
2659                    key,
2660                    value,
2661                }],
2662            },
2663            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2664        })
2665        .await
2666    }
2667
2668    pub async fn kv_compare_and_inc_u256(
2669        &self,
2670        project_id: &str,
2671        scope_id: &str,
2672        key: Vec<u8>,
2673        amount_be: [u8; 32],
2674        expected_seq: u64,
2675    ) -> Result<CommitResult, AedbError> {
2676        self.commit_envelope(TransactionEnvelope {
2677            caller: None,
2678            idempotency_key: None,
2679            write_class: crate::commit::tx::WriteClass::Standard,
2680            assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2681                project_id: project_id.to_string(),
2682                scope_id: scope_id.to_string(),
2683                key: key.clone(),
2684                expected_seq,
2685            }],
2686            read_set: crate::commit::tx::ReadSet::default(),
2687            write_intent: crate::commit::tx::WriteIntent {
2688                mutations: vec![Mutation::KvIncU256 {
2689                    project_id: project_id.to_string(),
2690                    scope_id: scope_id.to_string(),
2691                    key,
2692                    amount_be,
2693                }],
2694            },
2695            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2696        })
2697        .await
2698    }
2699
2700    pub async fn kv_compare_and_inc_u256_as(
2701        &self,
2702        caller: CallerContext,
2703        project_id: &str,
2704        scope_id: &str,
2705        key: Vec<u8>,
2706        amount_be: [u8; 32],
2707        expected_seq: u64,
2708    ) -> Result<CommitResult, AedbError> {
2709        self.commit_envelope(TransactionEnvelope {
2710            caller: Some(caller),
2711            idempotency_key: None,
2712            write_class: crate::commit::tx::WriteClass::Standard,
2713            assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2714                project_id: project_id.to_string(),
2715                scope_id: scope_id.to_string(),
2716                key: key.clone(),
2717                expected_seq,
2718            }],
2719            read_set: crate::commit::tx::ReadSet::default(),
2720            write_intent: crate::commit::tx::WriteIntent {
2721                mutations: vec![Mutation::KvIncU256 {
2722                    project_id: project_id.to_string(),
2723                    scope_id: scope_id.to_string(),
2724                    key,
2725                    amount_be,
2726                }],
2727            },
2728            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2729        })
2730        .await
2731    }
2732
2733    pub async fn kv_compare_and_dec_u256(
2734        &self,
2735        project_id: &str,
2736        scope_id: &str,
2737        key: Vec<u8>,
2738        amount_be: [u8; 32],
2739        expected_seq: u64,
2740    ) -> Result<CommitResult, AedbError> {
2741        self.commit_envelope(TransactionEnvelope {
2742            caller: None,
2743            idempotency_key: None,
2744            write_class: crate::commit::tx::WriteClass::Standard,
2745            assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2746                project_id: project_id.to_string(),
2747                scope_id: scope_id.to_string(),
2748                key: key.clone(),
2749                expected_seq,
2750            }],
2751            read_set: crate::commit::tx::ReadSet::default(),
2752            write_intent: crate::commit::tx::WriteIntent {
2753                mutations: vec![Mutation::KvDecU256 {
2754                    project_id: project_id.to_string(),
2755                    scope_id: scope_id.to_string(),
2756                    key,
2757                    amount_be,
2758                }],
2759            },
2760            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2761        })
2762        .await
2763    }
2764
2765    pub async fn kv_compare_and_dec_u256_as(
2766        &self,
2767        caller: CallerContext,
2768        project_id: &str,
2769        scope_id: &str,
2770        key: Vec<u8>,
2771        amount_be: [u8; 32],
2772        expected_seq: u64,
2773    ) -> Result<CommitResult, AedbError> {
2774        self.commit_envelope(TransactionEnvelope {
2775            caller: Some(caller),
2776            idempotency_key: None,
2777            write_class: crate::commit::tx::WriteClass::Standard,
2778            assertions: vec![crate::commit::tx::ReadAssertion::KeyVersion {
2779                project_id: project_id.to_string(),
2780                scope_id: scope_id.to_string(),
2781                key: key.clone(),
2782                expected_seq,
2783            }],
2784            read_set: crate::commit::tx::ReadSet::default(),
2785            write_intent: crate::commit::tx::WriteIntent {
2786                mutations: vec![Mutation::KvDecU256 {
2787                    project_id: project_id.to_string(),
2788                    scope_id: scope_id.to_string(),
2789                    key,
2790                    amount_be,
2791                }],
2792            },
2793            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
2794        })
2795        .await
2796    }
2797
2798    pub async fn table_inc_u256(
2799        &self,
2800        project_id: &str,
2801        scope_id: &str,
2802        table_name: &str,
2803        primary_key: Vec<Value>,
2804        column: &str,
2805        amount_be: [u8; 32],
2806    ) -> Result<CommitResult, AedbError> {
2807        self.commit(Mutation::TableIncU256 {
2808            project_id: project_id.to_string(),
2809            scope_id: scope_id.to_string(),
2810            table_name: table_name.to_string(),
2811            primary_key,
2812            column: column.to_string(),
2813            amount_be,
2814        })
2815        .await
2816    }
2817
2818    pub async fn table_inc_u256_as_with(
2819        &self,
2820        request: TableU256MutationRequest,
2821    ) -> Result<CommitResult, AedbError> {
2822        self.commit_as(
2823            request.caller,
2824            Mutation::TableIncU256 {
2825                project_id: request.project_id,
2826                scope_id: request.scope_id,
2827                table_name: request.table_name,
2828                primary_key: request.primary_key,
2829                column: request.column,
2830                amount_be: request.amount_be,
2831            },
2832        )
2833        .await
2834    }
2835
2836    #[allow(clippy::too_many_arguments)]
2837    pub async fn table_inc_u256_as(
2838        &self,
2839        caller: CallerContext,
2840        project_id: &str,
2841        scope_id: &str,
2842        table_name: &str,
2843        primary_key: Vec<Value>,
2844        column: &str,
2845        amount_be: [u8; 32],
2846    ) -> Result<CommitResult, AedbError> {
2847        self.table_inc_u256_as_with(TableU256MutationRequest {
2848            caller,
2849            project_id: project_id.to_string(),
2850            scope_id: scope_id.to_string(),
2851            table_name: table_name.to_string(),
2852            primary_key,
2853            column: column.to_string(),
2854            amount_be,
2855        })
2856        .await
2857    }
2858
2859    pub async fn table_dec_u256(
2860        &self,
2861        project_id: &str,
2862        scope_id: &str,
2863        table_name: &str,
2864        primary_key: Vec<Value>,
2865        column: &str,
2866        amount_be: [u8; 32],
2867    ) -> Result<CommitResult, AedbError> {
2868        self.commit(Mutation::TableDecU256 {
2869            project_id: project_id.to_string(),
2870            scope_id: scope_id.to_string(),
2871            table_name: table_name.to_string(),
2872            primary_key,
2873            column: column.to_string(),
2874            amount_be,
2875        })
2876        .await
2877    }
2878
2879    pub async fn table_dec_u256_as_with(
2880        &self,
2881        request: TableU256MutationRequest,
2882    ) -> Result<CommitResult, AedbError> {
2883        self.commit_as(
2884            request.caller,
2885            Mutation::TableDecU256 {
2886                project_id: request.project_id,
2887                scope_id: request.scope_id,
2888                table_name: request.table_name,
2889                primary_key: request.primary_key,
2890                column: request.column,
2891                amount_be: request.amount_be,
2892            },
2893        )
2894        .await
2895    }
2896
2897    #[allow(clippy::too_many_arguments)]
2898    pub async fn table_dec_u256_as(
2899        &self,
2900        caller: CallerContext,
2901        project_id: &str,
2902        scope_id: &str,
2903        table_name: &str,
2904        primary_key: Vec<Value>,
2905        column: &str,
2906        amount_be: [u8; 32],
2907    ) -> Result<CommitResult, AedbError> {
2908        self.table_dec_u256_as_with(TableU256MutationRequest {
2909            caller,
2910            project_id: project_id.to_string(),
2911            scope_id: scope_id.to_string(),
2912            table_name: table_name.to_string(),
2913            primary_key,
2914            column: column.to_string(),
2915            amount_be,
2916        })
2917        .await
2918    }
2919
2920    pub async fn order_book_new(
2921        &self,
2922        project_id: &str,
2923        scope_id: &str,
2924        request: OrderRequest,
2925    ) -> Result<CommitResult, AedbError> {
2926        self.preflight_order_book_new_if_high_reject_risk(None, project_id, scope_id, &request)
2927            .await?;
2928        let mutation = Mutation::OrderBookNew {
2929            project_id: project_id.to_string(),
2930            scope_id: scope_id.to_string(),
2931            request,
2932        };
2933        let (_, catalog, _) = self.executor.snapshot_state().await;
2934        validate_mutation_with_config(&catalog, &mutation, &self._config)?;
2935        self.commit_prevalidated_internal("order_book_new", mutation)
2936            .await
2937    }
2938
2939    pub async fn order_book_new_with_finality(
2940        &self,
2941        project_id: &str,
2942        scope_id: &str,
2943        request: OrderRequest,
2944        finality: CommitFinality,
2945    ) -> Result<CommitResult, AedbError> {
2946        let mut result = self.order_book_new(project_id, scope_id, request).await?;
2947        self.enforce_finality(&mut result, finality).await?;
2948        Ok(result)
2949    }
2950
2951    pub async fn order_book_define_table(
2952        &self,
2953        project_id: &str,
2954        scope_id: &str,
2955        table_id: &str,
2956        mode: OrderBookTableMode,
2957    ) -> Result<CommitResult, AedbError> {
2958        self.commit_prevalidated_internal(
2959            "order_book_define_table",
2960            Mutation::OrderBookDefineTable {
2961                project_id: project_id.to_string(),
2962                scope_id: scope_id.to_string(),
2963                table_id: table_id.to_string(),
2964                mode,
2965            },
2966        )
2967        .await
2968    }
2969
2970    pub async fn order_book_define_table_as(
2971        &self,
2972        caller: CallerContext,
2973        project_id: &str,
2974        scope_id: &str,
2975        table_id: &str,
2976        mode: OrderBookTableMode,
2977    ) -> Result<CommitResult, AedbError> {
2978        self.commit_as(
2979            caller,
2980            Mutation::OrderBookDefineTable {
2981                project_id: project_id.to_string(),
2982                scope_id: scope_id.to_string(),
2983                table_id: table_id.to_string(),
2984                mode,
2985            },
2986        )
2987        .await
2988    }
2989
2990    pub async fn order_book_drop_table(
2991        &self,
2992        project_id: &str,
2993        scope_id: &str,
2994        table_id: &str,
2995    ) -> Result<CommitResult, AedbError> {
2996        self.commit_prevalidated_internal(
2997            "order_book_drop_table",
2998            Mutation::OrderBookDropTable {
2999                project_id: project_id.to_string(),
3000                scope_id: scope_id.to_string(),
3001                table_id: table_id.to_string(),
3002            },
3003        )
3004        .await
3005    }
3006
3007    pub async fn order_book_drop_table_as(
3008        &self,
3009        caller: CallerContext,
3010        project_id: &str,
3011        scope_id: &str,
3012        table_id: &str,
3013    ) -> Result<CommitResult, AedbError> {
3014        self.commit_as(
3015            caller,
3016            Mutation::OrderBookDropTable {
3017                project_id: project_id.to_string(),
3018                scope_id: scope_id.to_string(),
3019                table_id: table_id.to_string(),
3020            },
3021        )
3022        .await
3023    }
3024
3025    pub async fn order_book_set_instrument_config(
3026        &self,
3027        project_id: &str,
3028        scope_id: &str,
3029        instrument: &str,
3030        config: InstrumentConfig,
3031    ) -> Result<CommitResult, AedbError> {
3032        self.commit_prevalidated_internal(
3033            "order_book_set_instrument_config",
3034            Mutation::OrderBookSetInstrumentConfig {
3035                project_id: project_id.to_string(),
3036                scope_id: scope_id.to_string(),
3037                instrument: instrument.to_string(),
3038                config,
3039            },
3040        )
3041        .await
3042    }
3043
3044    pub async fn order_book_set_instrument_config_as(
3045        &self,
3046        caller: CallerContext,
3047        project_id: &str,
3048        scope_id: &str,
3049        instrument: &str,
3050        config: InstrumentConfig,
3051    ) -> Result<CommitResult, AedbError> {
3052        self.commit_as(
3053            caller,
3054            Mutation::OrderBookSetInstrumentConfig {
3055                project_id: project_id.to_string(),
3056                scope_id: scope_id.to_string(),
3057                instrument: instrument.to_string(),
3058                config,
3059            },
3060        )
3061        .await
3062    }
3063
3064    pub async fn order_book_set_instrument_halted(
3065        &self,
3066        project_id: &str,
3067        scope_id: &str,
3068        instrument: &str,
3069        halted: bool,
3070    ) -> Result<CommitResult, AedbError> {
3071        self.commit_prevalidated_internal(
3072            "order_book_set_instrument_halted",
3073            Mutation::OrderBookSetInstrumentHalted {
3074                project_id: project_id.to_string(),
3075                scope_id: scope_id.to_string(),
3076                instrument: instrument.to_string(),
3077                halted,
3078            },
3079        )
3080        .await
3081    }
3082
3083    pub async fn order_book_set_instrument_halted_as(
3084        &self,
3085        caller: CallerContext,
3086        project_id: &str,
3087        scope_id: &str,
3088        instrument: &str,
3089        halted: bool,
3090    ) -> Result<CommitResult, AedbError> {
3091        self.commit_as(
3092            caller,
3093            Mutation::OrderBookSetInstrumentHalted {
3094                project_id: project_id.to_string(),
3095                scope_id: scope_id.to_string(),
3096                instrument: instrument.to_string(),
3097                halted,
3098            },
3099        )
3100        .await
3101    }
3102
3103    pub async fn order_book_new_in_table(
3104        &self,
3105        project_id: &str,
3106        scope_id: &str,
3107        table_id: &str,
3108        asset_id: &str,
3109        mut request: OrderRequest,
3110    ) -> Result<CommitResult, AedbError> {
3111        request.instrument = scoped_instrument(table_id, asset_id);
3112        self.order_book_new(project_id, scope_id, request).await
3113    }
3114
3115    pub async fn order_book_new_as(
3116        &self,
3117        caller: CallerContext,
3118        project_id: &str,
3119        scope_id: &str,
3120        request: OrderRequest,
3121    ) -> Result<CommitResult, AedbError> {
3122        self.preflight_order_book_new_if_high_reject_risk(
3123            Some(&caller),
3124            project_id,
3125            scope_id,
3126            &request,
3127        )
3128        .await?;
3129        self.commit_as(
3130            caller,
3131            Mutation::OrderBookNew {
3132                project_id: project_id.to_string(),
3133                scope_id: scope_id.to_string(),
3134                request,
3135            },
3136        )
3137        .await
3138    }
3139
3140    pub async fn order_book_new_as_with_finality(
3141        &self,
3142        caller: CallerContext,
3143        project_id: &str,
3144        scope_id: &str,
3145        request: OrderRequest,
3146        finality: CommitFinality,
3147    ) -> Result<CommitResult, AedbError> {
3148        let mut result = self
3149            .order_book_new_as(caller, project_id, scope_id, request)
3150            .await?;
3151        self.enforce_finality(&mut result, finality).await?;
3152        Ok(result)
3153    }
3154
3155    fn should_preflight_order_book_new(request: &OrderRequest) -> bool {
3156        request.exec_instructions.post_only()
3157            || matches!(request.time_in_force, TimeInForce::Fok)
3158            || matches!(request.order_type, OrderType::Market)
3159    }
3160
3161    async fn preflight_order_book_new_if_high_reject_risk(
3162        &self,
3163        caller: Option<&CallerContext>,
3164        project_id: &str,
3165        scope_id: &str,
3166        request: &OrderRequest,
3167    ) -> Result<(), AedbError> {
3168        if !Self::should_preflight_order_book_new(request) {
3169            return Ok(());
3170        }
3171        let mutation = Mutation::OrderBookNew {
3172            project_id: project_id.to_string(),
3173            scope_id: scope_id.to_string(),
3174            request: request.clone(),
3175        };
3176        let preflight_result = if let Some(caller) = caller {
3177            self.preflight_as(caller, mutation).await?
3178        } else {
3179            self.preflight(mutation).await
3180        };
3181        if let PreflightResult::Err { reason } = preflight_result {
3182            self.upstream_validation_rejections
3183                .fetch_add(1, Ordering::Relaxed);
3184            return Err(AedbError::Validation(reason));
3185        }
3186        Ok(())
3187    }
3188
3189    pub async fn order_book_cancel(
3190        &self,
3191        project_id: &str,
3192        scope_id: &str,
3193        instrument: &str,
3194        order_id: u64,
3195        owner: &str,
3196    ) -> Result<CommitResult, AedbError> {
3197        self.commit_prevalidated_internal(
3198            "order_book_cancel",
3199            Mutation::OrderBookCancel {
3200                project_id: project_id.to_string(),
3201                scope_id: scope_id.to_string(),
3202                instrument: instrument.to_string(),
3203                order_id,
3204                client_order_id: None,
3205                owner: owner.to_string(),
3206            },
3207        )
3208        .await
3209    }
3210
3211    pub async fn order_book_cancel_with_finality(
3212        &self,
3213        project_id: &str,
3214        scope_id: &str,
3215        instrument: &str,
3216        order_id: u64,
3217        owner: &str,
3218        finality: CommitFinality,
3219    ) -> Result<CommitResult, AedbError> {
3220        self.commit_prevalidated_internal_with_finality(
3221            "order_book_cancel",
3222            Mutation::OrderBookCancel {
3223                project_id: project_id.to_string(),
3224                scope_id: scope_id.to_string(),
3225                instrument: instrument.to_string(),
3226                order_id,
3227                client_order_id: None,
3228                owner: owner.to_string(),
3229            },
3230            finality,
3231        )
3232        .await
3233    }
3234
3235    pub async fn order_book_cancel_strict(
3236        &self,
3237        project_id: &str,
3238        scope_id: &str,
3239        instrument: &str,
3240        order_id: u64,
3241        owner: &str,
3242        finality: CommitFinality,
3243    ) -> Result<CommitResult, AedbError> {
3244        self.order_book_cancel_strict_as_internal(
3245            None, project_id, scope_id, instrument, order_id, owner, finality,
3246        )
3247        .await
3248    }
3249
3250    pub async fn order_book_cancel_as(
3251        &self,
3252        caller: CallerContext,
3253        project_id: &str,
3254        scope_id: &str,
3255        instrument: &str,
3256        order_id: u64,
3257        owner: &str,
3258    ) -> Result<CommitResult, AedbError> {
3259        self.commit_as(
3260            caller,
3261            Mutation::OrderBookCancel {
3262                project_id: project_id.to_string(),
3263                scope_id: scope_id.to_string(),
3264                instrument: instrument.to_string(),
3265                order_id,
3266                client_order_id: None,
3267                owner: owner.to_string(),
3268            },
3269        )
3270        .await
3271    }
3272
3273    #[allow(clippy::too_many_arguments)]
3274    pub async fn order_book_cancel_as_with_finality(
3275        &self,
3276        caller: CallerContext,
3277        project_id: &str,
3278        scope_id: &str,
3279        instrument: &str,
3280        order_id: u64,
3281        owner: &str,
3282        finality: CommitFinality,
3283    ) -> Result<CommitResult, AedbError> {
3284        let mut result = self
3285            .order_book_cancel_as(caller, project_id, scope_id, instrument, order_id, owner)
3286            .await?;
3287        self.enforce_finality(&mut result, finality).await?;
3288        Ok(result)
3289    }
3290
3291    #[allow(clippy::too_many_arguments)]
3292    pub async fn order_book_cancel_strict_as(
3293        &self,
3294        caller: CallerContext,
3295        project_id: &str,
3296        scope_id: &str,
3297        instrument: &str,
3298        order_id: u64,
3299        owner: &str,
3300        finality: CommitFinality,
3301    ) -> Result<CommitResult, AedbError> {
3302        self.order_book_cancel_strict_as_internal(
3303            Some(caller),
3304            project_id,
3305            scope_id,
3306            instrument,
3307            order_id,
3308            owner,
3309            finality,
3310        )
3311        .await
3312    }
3313
3314    #[allow(clippy::too_many_arguments)]
3315    async fn order_book_cancel_strict_as_internal(
3316        &self,
3317        caller: Option<CallerContext>,
3318        project_id: &str,
3319        scope_id: &str,
3320        instrument: &str,
3321        order_id: u64,
3322        owner: &str,
3323        finality: CommitFinality,
3324    ) -> Result<CommitResult, AedbError> {
3325        let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
3326        let order_key = key_order(instrument, order_id);
3327        let Some(entry) = lease.view.keyspace.kv_get(project_id, scope_id, &order_key) else {
3328            return Err(AedbError::Validation(format!(
3329                "strict cancel target not found: order_id={order_id}"
3330            )));
3331        };
3332        let order: OrderRecord =
3333            rmp_serde::from_slice(&entry.value).map_err(|e| AedbError::Decode(e.to_string()))?;
3334        if order.owner != owner {
3335            return Err(AedbError::PermissionDenied(
3336                "order ownership mismatch".into(),
3337            ));
3338        }
3339        if !matches!(
3340            order.status,
3341            crate::order_book::OrderStatus::Open | crate::order_book::OrderStatus::PartiallyFilled
3342        ) || u256_from_be(order.remaining_qty_be).is_zero()
3343        {
3344            return Err(AedbError::Validation(format!(
3345                "order not cancellable in current status: {:?}",
3346                order.status
3347            )));
3348        }
3349        let envelope = TransactionEnvelope {
3350            caller,
3351            idempotency_key: None,
3352            write_class: WriteClass::Standard,
3353            assertions: Vec::new(),
3354            read_set: ReadSet {
3355                points: vec![ReadSetEntry {
3356                    key: ReadKey::KvKey {
3357                        project_id: project_id.to_string(),
3358                        scope_id: scope_id.to_string(),
3359                        key: order_key,
3360                    },
3361                    version_at_read: entry.version,
3362                }],
3363                ranges: Vec::new(),
3364            },
3365            write_intent: WriteIntent {
3366                mutations: vec![Mutation::OrderBookCancel {
3367                    project_id: project_id.to_string(),
3368                    scope_id: scope_id.to_string(),
3369                    instrument: instrument.to_string(),
3370                    order_id,
3371                    client_order_id: None,
3372                    owner: owner.to_string(),
3373                }],
3374            },
3375            base_seq: lease.view.seq,
3376        };
3377        self.commit_envelope_with_finality(envelope, finality).await
3378    }
3379
3380    pub async fn order_book_cancel_by_client_id(
3381        &self,
3382        project_id: &str,
3383        scope_id: &str,
3384        instrument: &str,
3385        client_order_id: &str,
3386        owner: &str,
3387    ) -> Result<CommitResult, AedbError> {
3388        self.commit_prevalidated_internal(
3389            "order_book_cancel_by_client_id",
3390            Mutation::OrderBookCancel {
3391                project_id: project_id.to_string(),
3392                scope_id: scope_id.to_string(),
3393                instrument: instrument.to_string(),
3394                order_id: 0,
3395                client_order_id: Some(client_order_id.to_string()),
3396                owner: owner.to_string(),
3397            },
3398        )
3399        .await
3400    }
3401
3402    pub async fn order_book_cancel_by_client_id_as(
3403        &self,
3404        caller: CallerContext,
3405        project_id: &str,
3406        scope_id: &str,
3407        instrument: &str,
3408        client_order_id: &str,
3409        owner: &str,
3410    ) -> Result<CommitResult, AedbError> {
3411        self.commit_as(
3412            caller,
3413            Mutation::OrderBookCancel {
3414                project_id: project_id.to_string(),
3415                scope_id: scope_id.to_string(),
3416                instrument: instrument.to_string(),
3417                order_id: 0,
3418                client_order_id: Some(client_order_id.to_string()),
3419                owner: owner.to_string(),
3420            },
3421        )
3422        .await
3423    }
3424
3425    pub async fn order_book_cancel_by_client_id_strict(
3426        &self,
3427        project_id: &str,
3428        scope_id: &str,
3429        instrument: &str,
3430        client_order_id: &str,
3431        owner: &str,
3432        finality: CommitFinality,
3433    ) -> Result<CommitResult, AedbError> {
3434        self.order_book_cancel_by_client_id_strict_as_internal(
3435            None,
3436            project_id,
3437            scope_id,
3438            instrument,
3439            client_order_id,
3440            owner,
3441            finality,
3442        )
3443        .await
3444    }
3445
3446    #[allow(clippy::too_many_arguments)]
3447    pub async fn order_book_cancel_by_client_id_strict_as(
3448        &self,
3449        caller: CallerContext,
3450        project_id: &str,
3451        scope_id: &str,
3452        instrument: &str,
3453        client_order_id: &str,
3454        owner: &str,
3455        finality: CommitFinality,
3456    ) -> Result<CommitResult, AedbError> {
3457        self.order_book_cancel_by_client_id_strict_as_internal(
3458            Some(caller),
3459            project_id,
3460            scope_id,
3461            instrument,
3462            client_order_id,
3463            owner,
3464            finality,
3465        )
3466        .await
3467    }
3468
3469    #[allow(clippy::too_many_arguments)]
3470    async fn order_book_cancel_by_client_id_strict_as_internal(
3471        &self,
3472        caller: Option<CallerContext>,
3473        project_id: &str,
3474        scope_id: &str,
3475        instrument: &str,
3476        client_order_id: &str,
3477        owner: &str,
3478        finality: CommitFinality,
3479    ) -> Result<CommitResult, AedbError> {
3480        let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
3481        let cid_key = key_client_id(instrument, owner, client_order_id);
3482        let Some(cid_entry) = lease.view.keyspace.kv_get(project_id, scope_id, &cid_key) else {
3483            return Err(AedbError::Validation(format!(
3484                "strict cancel target not found: client_order_id={client_order_id}"
3485            )));
3486        };
3487        if cid_entry.value.len() != 8 {
3488            return Err(AedbError::Validation(
3489                "invalid client-order mapping encoding".into(),
3490            ));
3491        }
3492        let mut id_bytes = [0u8; 8];
3493        id_bytes.copy_from_slice(&cid_entry.value);
3494        let order_id = u64::from_be_bytes(id_bytes);
3495        let order_key = key_order(instrument, order_id);
3496        let Some(order_entry) = lease.view.keyspace.kv_get(project_id, scope_id, &order_key) else {
3497            return Err(AedbError::Validation(format!(
3498                "strict cancel target not found: order_id={order_id}"
3499            )));
3500        };
3501        let order: OrderRecord = rmp_serde::from_slice(&order_entry.value)
3502            .map_err(|e| AedbError::Decode(e.to_string()))?;
3503        if order.owner != owner {
3504            return Err(AedbError::PermissionDenied(
3505                "order ownership mismatch".into(),
3506            ));
3507        }
3508        if !matches!(
3509            order.status,
3510            crate::order_book::OrderStatus::Open | crate::order_book::OrderStatus::PartiallyFilled
3511        ) || u256_from_be(order.remaining_qty_be).is_zero()
3512        {
3513            return Err(AedbError::Validation(format!(
3514                "order not cancellable in current status: {:?}",
3515                order.status
3516            )));
3517        }
3518        let envelope = TransactionEnvelope {
3519            caller,
3520            idempotency_key: None,
3521            write_class: WriteClass::Standard,
3522            assertions: Vec::new(),
3523            read_set: ReadSet {
3524                points: vec![
3525                    ReadSetEntry {
3526                        key: ReadKey::KvKey {
3527                            project_id: project_id.to_string(),
3528                            scope_id: scope_id.to_string(),
3529                            key: cid_key,
3530                        },
3531                        version_at_read: cid_entry.version,
3532                    },
3533                    ReadSetEntry {
3534                        key: ReadKey::KvKey {
3535                            project_id: project_id.to_string(),
3536                            scope_id: scope_id.to_string(),
3537                            key: order_key,
3538                        },
3539                        version_at_read: order_entry.version,
3540                    },
3541                ],
3542                ranges: Vec::new(),
3543            },
3544            write_intent: WriteIntent {
3545                mutations: vec![Mutation::OrderBookCancel {
3546                    project_id: project_id.to_string(),
3547                    scope_id: scope_id.to_string(),
3548                    instrument: instrument.to_string(),
3549                    order_id: 0,
3550                    client_order_id: Some(client_order_id.to_string()),
3551                    owner: owner.to_string(),
3552                }],
3553            },
3554            base_seq: lease.view.seq,
3555        };
3556        self.commit_envelope_with_finality(envelope, finality).await
3557    }
3558
3559    #[allow(clippy::too_many_arguments)]
3560    pub async fn order_book_cancel_replace_strict(
3561        &self,
3562        project_id: &str,
3563        scope_id: &str,
3564        instrument: &str,
3565        order_id: u64,
3566        owner: &str,
3567        new_price_ticks: Option<i64>,
3568        new_qty_be: Option<[u8; 32]>,
3569        new_time_in_force: Option<TimeInForce>,
3570        new_exec_instructions: Option<ExecInstruction>,
3571        finality: CommitFinality,
3572    ) -> Result<CommitResult, AedbError> {
3573        self.order_book_cancel_replace_strict_as_internal(
3574            None,
3575            project_id,
3576            scope_id,
3577            instrument,
3578            order_id,
3579            owner,
3580            new_price_ticks,
3581            new_qty_be,
3582            new_time_in_force,
3583            new_exec_instructions,
3584            finality,
3585        )
3586        .await
3587    }
3588
3589    #[allow(clippy::too_many_arguments)]
3590    pub async fn order_book_cancel_replace_strict_as(
3591        &self,
3592        caller: CallerContext,
3593        project_id: &str,
3594        scope_id: &str,
3595        instrument: &str,
3596        order_id: u64,
3597        owner: &str,
3598        new_price_ticks: Option<i64>,
3599        new_qty_be: Option<[u8; 32]>,
3600        new_time_in_force: Option<TimeInForce>,
3601        new_exec_instructions: Option<ExecInstruction>,
3602        finality: CommitFinality,
3603    ) -> Result<CommitResult, AedbError> {
3604        self.order_book_cancel_replace_strict_as_internal(
3605            Some(caller),
3606            project_id,
3607            scope_id,
3608            instrument,
3609            order_id,
3610            owner,
3611            new_price_ticks,
3612            new_qty_be,
3613            new_time_in_force,
3614            new_exec_instructions,
3615            finality,
3616        )
3617        .await
3618    }
3619
3620    #[allow(clippy::too_many_arguments)]
3621    async fn order_book_cancel_replace_strict_as_internal(
3622        &self,
3623        caller: Option<CallerContext>,
3624        project_id: &str,
3625        scope_id: &str,
3626        instrument: &str,
3627        order_id: u64,
3628        owner: &str,
3629        new_price_ticks: Option<i64>,
3630        new_qty_be: Option<[u8; 32]>,
3631        new_time_in_force: Option<TimeInForce>,
3632        new_exec_instructions: Option<ExecInstruction>,
3633        finality: CommitFinality,
3634    ) -> Result<CommitResult, AedbError> {
3635        let (order_key, version, base_seq) = self
3636            .order_book_strict_cancellable_version(
3637                project_id, scope_id, instrument, order_id, owner,
3638            )
3639            .await?;
3640        let envelope = TransactionEnvelope {
3641            caller,
3642            idempotency_key: None,
3643            write_class: WriteClass::Standard,
3644            assertions: Vec::new(),
3645            read_set: ReadSet {
3646                points: vec![ReadSetEntry {
3647                    key: ReadKey::KvKey {
3648                        project_id: project_id.to_string(),
3649                        scope_id: scope_id.to_string(),
3650                        key: order_key,
3651                    },
3652                    version_at_read: version,
3653                }],
3654                ranges: Vec::new(),
3655            },
3656            write_intent: WriteIntent {
3657                mutations: vec![Mutation::OrderBookCancelReplace {
3658                    project_id: project_id.to_string(),
3659                    scope_id: scope_id.to_string(),
3660                    instrument: instrument.to_string(),
3661                    order_id,
3662                    owner: owner.to_string(),
3663                    new_price_ticks,
3664                    new_qty_be,
3665                    new_time_in_force,
3666                    new_exec_instructions,
3667                }],
3668            },
3669            base_seq,
3670        };
3671        self.commit_envelope_with_finality(envelope, finality).await
3672    }
3673
3674    #[allow(clippy::too_many_arguments)]
3675    pub async fn order_book_reduce_strict(
3676        &self,
3677        project_id: &str,
3678        scope_id: &str,
3679        instrument: &str,
3680        order_id: u64,
3681        owner: &str,
3682        reduce_by_be: [u8; 32],
3683        finality: CommitFinality,
3684    ) -> Result<CommitResult, AedbError> {
3685        self.order_book_reduce_strict_as_internal(
3686            None,
3687            project_id,
3688            scope_id,
3689            instrument,
3690            order_id,
3691            owner,
3692            reduce_by_be,
3693            finality,
3694        )
3695        .await
3696    }
3697
3698    #[allow(clippy::too_many_arguments)]
3699    pub async fn order_book_reduce_strict_as(
3700        &self,
3701        caller: CallerContext,
3702        project_id: &str,
3703        scope_id: &str,
3704        instrument: &str,
3705        order_id: u64,
3706        owner: &str,
3707        reduce_by_be: [u8; 32],
3708        finality: CommitFinality,
3709    ) -> Result<CommitResult, AedbError> {
3710        self.order_book_reduce_strict_as_internal(
3711            Some(caller),
3712            project_id,
3713            scope_id,
3714            instrument,
3715            order_id,
3716            owner,
3717            reduce_by_be,
3718            finality,
3719        )
3720        .await
3721    }
3722
3723    #[allow(clippy::too_many_arguments)]
3724    async fn order_book_reduce_strict_as_internal(
3725        &self,
3726        caller: Option<CallerContext>,
3727        project_id: &str,
3728        scope_id: &str,
3729        instrument: &str,
3730        order_id: u64,
3731        owner: &str,
3732        reduce_by_be: [u8; 32],
3733        finality: CommitFinality,
3734    ) -> Result<CommitResult, AedbError> {
3735        let reduce_by = u256_from_be(reduce_by_be);
3736        if reduce_by.is_zero() {
3737            return Err(AedbError::Validation(
3738                "strict reduce requires reduce_by > 0".into(),
3739            ));
3740        }
3741        let (order_key, version, base_seq) = self
3742            .order_book_strict_cancellable_version(
3743                project_id, scope_id, instrument, order_id, owner,
3744            )
3745            .await?;
3746        let envelope = TransactionEnvelope {
3747            caller,
3748            idempotency_key: None,
3749            write_class: WriteClass::Standard,
3750            assertions: Vec::new(),
3751            read_set: ReadSet {
3752                points: vec![ReadSetEntry {
3753                    key: ReadKey::KvKey {
3754                        project_id: project_id.to_string(),
3755                        scope_id: scope_id.to_string(),
3756                        key: order_key,
3757                    },
3758                    version_at_read: version,
3759                }],
3760                ranges: Vec::new(),
3761            },
3762            write_intent: WriteIntent {
3763                mutations: vec![Mutation::OrderBookReduce {
3764                    project_id: project_id.to_string(),
3765                    scope_id: scope_id.to_string(),
3766                    instrument: instrument.to_string(),
3767                    order_id,
3768                    owner: owner.to_string(),
3769                    reduce_by_be,
3770                }],
3771            },
3772            base_seq,
3773        };
3774        self.commit_envelope_with_finality(envelope, finality).await
3775    }
3776
3777    async fn order_book_strict_cancellable_version(
3778        &self,
3779        project_id: &str,
3780        scope_id: &str,
3781        instrument: &str,
3782        order_id: u64,
3783        owner: &str,
3784    ) -> Result<(Vec<u8>, u64, u64), AedbError> {
3785        let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
3786        let order_key = key_order(instrument, order_id);
3787        let Some(entry) = lease.view.keyspace.kv_get(project_id, scope_id, &order_key) else {
3788            return Err(AedbError::Validation(format!(
3789                "strict target not found: order_id={order_id}"
3790            )));
3791        };
3792        let order: OrderRecord =
3793            rmp_serde::from_slice(&entry.value).map_err(|e| AedbError::Decode(e.to_string()))?;
3794        if order.owner != owner {
3795            return Err(AedbError::PermissionDenied(
3796                "order ownership mismatch".into(),
3797            ));
3798        }
3799        if !matches!(
3800            order.status,
3801            crate::order_book::OrderStatus::Open | crate::order_book::OrderStatus::PartiallyFilled
3802        ) || u256_from_be(order.remaining_qty_be).is_zero()
3803        {
3804            return Err(AedbError::Validation(format!(
3805                "order not mutable in current status: {:?}",
3806                order.status
3807            )));
3808        }
3809        Ok((order_key, entry.version, lease.view.seq))
3810    }
3811
3812    #[allow(clippy::too_many_arguments)]
3813    pub async fn order_book_cancel_replace(
3814        &self,
3815        project_id: &str,
3816        scope_id: &str,
3817        instrument: &str,
3818        order_id: u64,
3819        owner: &str,
3820        new_price_ticks: Option<i64>,
3821        new_qty_be: Option<[u8; 32]>,
3822        new_time_in_force: Option<TimeInForce>,
3823        new_exec_instructions: Option<ExecInstruction>,
3824    ) -> Result<CommitResult, AedbError> {
3825        self.commit_prevalidated_internal(
3826            "order_book_cancel_replace",
3827            Mutation::OrderBookCancelReplace {
3828                project_id: project_id.to_string(),
3829                scope_id: scope_id.to_string(),
3830                instrument: instrument.to_string(),
3831                order_id,
3832                owner: owner.to_string(),
3833                new_price_ticks,
3834                new_qty_be,
3835                new_time_in_force,
3836                new_exec_instructions,
3837            },
3838        )
3839        .await
3840    }
3841
3842    #[allow(clippy::too_many_arguments)]
3843    pub async fn order_book_cancel_replace_as(
3844        &self,
3845        caller: CallerContext,
3846        project_id: &str,
3847        scope_id: &str,
3848        instrument: &str,
3849        order_id: u64,
3850        owner: &str,
3851        new_price_ticks: Option<i64>,
3852        new_qty_be: Option<[u8; 32]>,
3853        new_time_in_force: Option<TimeInForce>,
3854        new_exec_instructions: Option<ExecInstruction>,
3855    ) -> Result<CommitResult, AedbError> {
3856        self.commit_as(
3857            caller,
3858            Mutation::OrderBookCancelReplace {
3859                project_id: project_id.to_string(),
3860                scope_id: scope_id.to_string(),
3861                instrument: instrument.to_string(),
3862                order_id,
3863                owner: owner.to_string(),
3864                new_price_ticks,
3865                new_qty_be,
3866                new_time_in_force,
3867                new_exec_instructions,
3868            },
3869        )
3870        .await
3871    }
3872
3873    #[allow(clippy::too_many_arguments)]
3874    pub async fn order_book_mass_cancel(
3875        &self,
3876        project_id: &str,
3877        scope_id: &str,
3878        instrument: &str,
3879        owner: &str,
3880        side: Option<OrderSide>,
3881        owner_filter: Option<String>,
3882        price_range_ticks: Option<(i64, i64)>,
3883    ) -> Result<CommitResult, AedbError> {
3884        self.commit_prevalidated_internal(
3885            "order_book_mass_cancel",
3886            Mutation::OrderBookMassCancel {
3887                project_id: project_id.to_string(),
3888                scope_id: scope_id.to_string(),
3889                instrument: instrument.to_string(),
3890                owner: owner.to_string(),
3891                side,
3892                owner_filter,
3893                price_range_ticks,
3894            },
3895        )
3896        .await
3897    }
3898
3899    #[allow(clippy::too_many_arguments)]
3900    pub async fn order_book_mass_cancel_as(
3901        &self,
3902        caller: CallerContext,
3903        project_id: &str,
3904        scope_id: &str,
3905        instrument: &str,
3906        owner: &str,
3907        side: Option<OrderSide>,
3908        owner_filter: Option<String>,
3909        price_range_ticks: Option<(i64, i64)>,
3910    ) -> Result<CommitResult, AedbError> {
3911        self.commit_as(
3912            caller,
3913            Mutation::OrderBookMassCancel {
3914                project_id: project_id.to_string(),
3915                scope_id: scope_id.to_string(),
3916                instrument: instrument.to_string(),
3917                owner: owner.to_string(),
3918                side,
3919                owner_filter,
3920                price_range_ticks,
3921            },
3922        )
3923        .await
3924    }
3925
3926    pub async fn order_book_reduce(
3927        &self,
3928        project_id: &str,
3929        scope_id: &str,
3930        instrument: &str,
3931        order_id: u64,
3932        owner: &str,
3933        reduce_by_be: [u8; 32],
3934    ) -> Result<CommitResult, AedbError> {
3935        self.commit_prevalidated_internal(
3936            "order_book_reduce",
3937            Mutation::OrderBookReduce {
3938                project_id: project_id.to_string(),
3939                scope_id: scope_id.to_string(),
3940                instrument: instrument.to_string(),
3941                order_id,
3942                owner: owner.to_string(),
3943                reduce_by_be,
3944            },
3945        )
3946        .await
3947    }
3948
3949    #[allow(clippy::too_many_arguments)]
3950    pub async fn order_book_reduce_as(
3951        &self,
3952        caller: CallerContext,
3953        project_id: &str,
3954        scope_id: &str,
3955        instrument: &str,
3956        order_id: u64,
3957        owner: &str,
3958        reduce_by_be: [u8; 32],
3959    ) -> Result<CommitResult, AedbError> {
3960        self.commit_as(
3961            caller,
3962            Mutation::OrderBookReduce {
3963                project_id: project_id.to_string(),
3964                scope_id: scope_id.to_string(),
3965                instrument: instrument.to_string(),
3966                order_id,
3967                owner: owner.to_string(),
3968                reduce_by_be,
3969            },
3970        )
3971        .await
3972    }
3973
3974    pub async fn order_book_match_internal(
3975        &self,
3976        project_id: &str,
3977        scope_id: &str,
3978        instrument: &str,
3979        fills: Vec<FillSpec>,
3980    ) -> Result<CommitResult, AedbError> {
3981        self.commit_prevalidated_internal(
3982            "order_book_match_internal",
3983            Mutation::OrderBookMatch {
3984                project_id: project_id.to_string(),
3985                scope_id: scope_id.to_string(),
3986                instrument: instrument.to_string(),
3987                fills,
3988            },
3989        )
3990        .await
3991    }
3992
3993    pub async fn order_book_match_internal_as(
3994        &self,
3995        caller: CallerContext,
3996        project_id: &str,
3997        scope_id: &str,
3998        instrument: &str,
3999        fills: Vec<FillSpec>,
4000    ) -> Result<CommitResult, AedbError> {
4001        self.commit_as(
4002            caller,
4003            Mutation::OrderBookMatch {
4004                project_id: project_id.to_string(),
4005                scope_id: scope_id.to_string(),
4006                instrument: instrument.to_string(),
4007                fills,
4008            },
4009        )
4010        .await
4011    }
4012
4013    pub async fn order_book_top_n(
4014        &self,
4015        project_id: &str,
4016        scope_id: &str,
4017        instrument: &str,
4018        depth: u32,
4019        consistency: ConsistencyMode,
4020        caller: &CallerContext,
4021    ) -> Result<OrderBookDepth, QueryError> {
4022        ensure_query_caller_allowed(caller)?;
4023        let lease = self
4024            .acquire_snapshot(consistency)
4025            .await
4026            .map_err(QueryError::from)?;
4027        let prefix = format!("ob:{instrument}:");
4028        if !lease.view.catalog.has_kv_read_permission(
4029            &caller.caller_id,
4030            project_id,
4031            scope_id,
4032            prefix.as_bytes(),
4033        ) {
4034            return Err(QueryError::PermissionDenied {
4035                permission: format!("KvRead({project_id}.{scope_id})"),
4036                scope: caller.caller_id.clone(),
4037            });
4038        }
4039        read_top_n(
4040            &lease.view.keyspace,
4041            project_id,
4042            scope_id,
4043            instrument,
4044            depth as usize,
4045            lease.view.seq,
4046        )
4047        .map_err(QueryError::from)
4048    }
4049
4050    pub async fn order_status(
4051        &self,
4052        project_id: &str,
4053        scope_id: &str,
4054        instrument: &str,
4055        order_id: u64,
4056        consistency: ConsistencyMode,
4057        caller: &CallerContext,
4058    ) -> Result<Option<OrderRecord>, QueryError> {
4059        ensure_query_caller_allowed(caller)?;
4060        let lease = self
4061            .acquire_snapshot(consistency)
4062            .await
4063            .map_err(QueryError::from)?;
4064        let prefix = format!("ob:{instrument}:");
4065        if !lease.view.catalog.has_kv_read_permission(
4066            &caller.caller_id,
4067            project_id,
4068            scope_id,
4069            prefix.as_bytes(),
4070        ) {
4071            return Err(QueryError::PermissionDenied {
4072                permission: format!("KvRead({project_id}.{scope_id})"),
4073                scope: caller.caller_id.clone(),
4074            });
4075        }
4076        let order = read_order_status(
4077            &lease.view.keyspace,
4078            project_id,
4079            scope_id,
4080            instrument,
4081            order_id,
4082        )
4083        .map_err(QueryError::from)?;
4084        if let Some(order) = &order {
4085            let admin = lease
4086                .view
4087                .catalog
4088                .has_permission(&caller.caller_id, &Permission::GlobalAdmin);
4089            if !admin && order.owner != caller.caller_id {
4090                return Err(QueryError::PermissionDenied {
4091                    permission: "order_status(owner match)".into(),
4092                    scope: caller.caller_id.clone(),
4093                });
4094            }
4095        }
4096        Ok(order)
4097    }
4098
4099    pub async fn open_orders(
4100        &self,
4101        project_id: &str,
4102        scope_id: &str,
4103        instrument: &str,
4104        owner: &str,
4105        consistency: ConsistencyMode,
4106        caller: &CallerContext,
4107    ) -> Result<Vec<OrderRecord>, QueryError> {
4108        ensure_query_caller_allowed(caller)?;
4109        let lease = self
4110            .acquire_snapshot(consistency)
4111            .await
4112            .map_err(QueryError::from)?;
4113        let prefix = format!("ob:{instrument}:");
4114        if !lease.view.catalog.has_kv_read_permission(
4115            &caller.caller_id,
4116            project_id,
4117            scope_id,
4118            prefix.as_bytes(),
4119        ) {
4120            return Err(QueryError::PermissionDenied {
4121                permission: format!("KvRead({project_id}.{scope_id})"),
4122                scope: caller.caller_id.clone(),
4123            });
4124        }
4125        let admin = lease
4126            .view
4127            .catalog
4128            .has_permission(&caller.caller_id, &Permission::GlobalAdmin);
4129        if !admin && owner != caller.caller_id {
4130            return Err(QueryError::PermissionDenied {
4131                permission: "open_orders(owner match)".into(),
4132                scope: caller.caller_id.clone(),
4133            });
4134        }
4135        read_open_orders(
4136            &lease.view.keyspace,
4137            project_id,
4138            scope_id,
4139            instrument,
4140            owner,
4141        )
4142        .map_err(QueryError::from)
4143    }
4144
4145    pub async fn recent_trades(
4146        &self,
4147        project_id: &str,
4148        scope_id: &str,
4149        instrument: &str,
4150        limit: u32,
4151        consistency: ConsistencyMode,
4152        caller: &CallerContext,
4153    ) -> Result<Vec<crate::order_book::FillRecord>, QueryError> {
4154        ensure_query_caller_allowed(caller)?;
4155        let lease = self
4156            .acquire_snapshot(consistency)
4157            .await
4158            .map_err(QueryError::from)?;
4159        let prefix = format!("ob:{instrument}:");
4160        if !lease.view.catalog.has_kv_read_permission(
4161            &caller.caller_id,
4162            project_id,
4163            scope_id,
4164            prefix.as_bytes(),
4165        ) {
4166            return Err(QueryError::PermissionDenied {
4167                permission: format!("KvRead({project_id}.{scope_id})"),
4168                scope: caller.caller_id.clone(),
4169            });
4170        }
4171        read_recent_trades(
4172            &lease.view.keyspace,
4173            project_id,
4174            scope_id,
4175            instrument,
4176            limit as usize,
4177        )
4178        .map_err(QueryError::from)
4179    }
4180
4181    pub async fn spread(
4182        &self,
4183        project_id: &str,
4184        scope_id: &str,
4185        instrument: &str,
4186        consistency: ConsistencyMode,
4187        caller: &CallerContext,
4188    ) -> Result<Spread, QueryError> {
4189        ensure_query_caller_allowed(caller)?;
4190        let lease = self
4191            .acquire_snapshot(consistency)
4192            .await
4193            .map_err(QueryError::from)?;
4194        let prefix = format!("ob:{instrument}:");
4195        if !lease.view.catalog.has_kv_read_permission(
4196            &caller.caller_id,
4197            project_id,
4198            scope_id,
4199            prefix.as_bytes(),
4200        ) {
4201            return Err(QueryError::PermissionDenied {
4202                permission: format!("KvRead({project_id}.{scope_id})"),
4203                scope: caller.caller_id.clone(),
4204            });
4205        }
4206        read_spread(
4207            &lease.view.keyspace,
4208            project_id,
4209            scope_id,
4210            instrument,
4211            lease.view.seq,
4212        )
4213        .map_err(QueryError::from)
4214    }
4215
4216    pub async fn order_book_last_execution_report(
4217        &self,
4218        project_id: &str,
4219        scope_id: &str,
4220        instrument: &str,
4221        consistency: ConsistencyMode,
4222        caller: &CallerContext,
4223    ) -> Result<Option<crate::order_book::ExecutionReport>, QueryError> {
4224        ensure_query_caller_allowed(caller)?;
4225        let lease = self
4226            .acquire_snapshot(consistency)
4227            .await
4228            .map_err(QueryError::from)?;
4229        let prefix = format!("ob:{instrument}:");
4230        if !lease.view.catalog.has_kv_read_permission(
4231            &caller.caller_id,
4232            project_id,
4233            scope_id,
4234            prefix.as_bytes(),
4235        ) {
4236            return Err(QueryError::PermissionDenied {
4237                permission: format!("KvRead({project_id}.{scope_id})"),
4238                scope: caller.caller_id.clone(),
4239            });
4240        }
4241        read_last_execution_report(&lease.view.keyspace, project_id, scope_id, instrument)
4242            .map_err(QueryError::from)
4243    }
4244
4245    pub async fn compare_and_swap(
4246        &self,
4247        project_id: &str,
4248        scope_id: &str,
4249        table_name: &str,
4250        primary_key: Vec<Value>,
4251        row: Row,
4252        expected_seq: u64,
4253    ) -> Result<CommitResult, AedbError> {
4254        self.commit_envelope(TransactionEnvelope {
4255            caller: None,
4256            idempotency_key: None,
4257            write_class: crate::commit::tx::WriteClass::Standard,
4258            assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4259                project_id: project_id.to_string(),
4260                scope_id: scope_id.to_string(),
4261                table_name: table_name.to_string(),
4262                primary_key: primary_key.clone(),
4263                expected_seq,
4264            }],
4265            read_set: crate::commit::tx::ReadSet::default(),
4266            write_intent: crate::commit::tx::WriteIntent {
4267                mutations: vec![Mutation::Upsert {
4268                    project_id: project_id.to_string(),
4269                    scope_id: scope_id.to_string(),
4270                    table_name: table_name.to_string(),
4271                    primary_key,
4272                    row,
4273                }],
4274            },
4275            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4276        })
4277        .await
4278    }
4279
4280    pub async fn compare_and_swap_as_with(
4281        &self,
4282        request: CompareAndSwapRequest,
4283    ) -> Result<CommitResult, AedbError> {
4284        self.commit_envelope(TransactionEnvelope {
4285            caller: Some(request.caller),
4286            idempotency_key: None,
4287            write_class: crate::commit::tx::WriteClass::Standard,
4288            assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4289                project_id: request.project_id.clone(),
4290                scope_id: request.scope_id.clone(),
4291                table_name: request.table_name.clone(),
4292                primary_key: request.primary_key.clone(),
4293                expected_seq: request.expected_seq,
4294            }],
4295            read_set: crate::commit::tx::ReadSet::default(),
4296            write_intent: crate::commit::tx::WriteIntent {
4297                mutations: vec![Mutation::Upsert {
4298                    project_id: request.project_id,
4299                    scope_id: request.scope_id,
4300                    table_name: request.table_name,
4301                    primary_key: request.primary_key,
4302                    row: request.row,
4303                }],
4304            },
4305            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4306        })
4307        .await
4308    }
4309
4310    #[allow(clippy::too_many_arguments)]
4311    pub async fn compare_and_swap_as(
4312        &self,
4313        caller: CallerContext,
4314        project_id: &str,
4315        scope_id: &str,
4316        table_name: &str,
4317        primary_key: Vec<Value>,
4318        row: Row,
4319        expected_seq: u64,
4320    ) -> Result<CommitResult, AedbError> {
4321        self.compare_and_swap_as_with(CompareAndSwapRequest {
4322            caller,
4323            project_id: project_id.to_string(),
4324            scope_id: scope_id.to_string(),
4325            table_name: table_name.to_string(),
4326            primary_key,
4327            row,
4328            expected_seq,
4329        })
4330        .await
4331    }
4332
4333    #[allow(clippy::too_many_arguments)]
4334    pub async fn compare_and_inc_u256(
4335        &self,
4336        project_id: &str,
4337        scope_id: &str,
4338        table_name: &str,
4339        primary_key: Vec<Value>,
4340        column: &str,
4341        amount_be: [u8; 32],
4342        expected_seq: u64,
4343    ) -> Result<CommitResult, AedbError> {
4344        self.commit_envelope(TransactionEnvelope {
4345            caller: None,
4346            idempotency_key: None,
4347            write_class: crate::commit::tx::WriteClass::Standard,
4348            assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4349                project_id: project_id.to_string(),
4350                scope_id: scope_id.to_string(),
4351                table_name: table_name.to_string(),
4352                primary_key: primary_key.clone(),
4353                expected_seq,
4354            }],
4355            read_set: crate::commit::tx::ReadSet::default(),
4356            write_intent: crate::commit::tx::WriteIntent {
4357                mutations: vec![Mutation::TableIncU256 {
4358                    project_id: project_id.to_string(),
4359                    scope_id: scope_id.to_string(),
4360                    table_name: table_name.to_string(),
4361                    primary_key,
4362                    column: column.to_string(),
4363                    amount_be,
4364                }],
4365            },
4366            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4367        })
4368        .await
4369    }
4370
4371    #[allow(clippy::too_many_arguments)]
4372    pub async fn compare_and_inc_u256_as(
4373        &self,
4374        caller: CallerContext,
4375        project_id: &str,
4376        scope_id: &str,
4377        table_name: &str,
4378        primary_key: Vec<Value>,
4379        column: &str,
4380        amount_be: [u8; 32],
4381        expected_seq: u64,
4382    ) -> Result<CommitResult, AedbError> {
4383        self.commit_envelope(TransactionEnvelope {
4384            caller: Some(caller),
4385            idempotency_key: None,
4386            write_class: crate::commit::tx::WriteClass::Standard,
4387            assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4388                project_id: project_id.to_string(),
4389                scope_id: scope_id.to_string(),
4390                table_name: table_name.to_string(),
4391                primary_key: primary_key.clone(),
4392                expected_seq,
4393            }],
4394            read_set: crate::commit::tx::ReadSet::default(),
4395            write_intent: crate::commit::tx::WriteIntent {
4396                mutations: vec![Mutation::TableIncU256 {
4397                    project_id: project_id.to_string(),
4398                    scope_id: scope_id.to_string(),
4399                    table_name: table_name.to_string(),
4400                    primary_key,
4401                    column: column.to_string(),
4402                    amount_be,
4403                }],
4404            },
4405            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4406        })
4407        .await
4408    }
4409
4410    #[allow(clippy::too_many_arguments)]
4411    pub async fn compare_and_dec_u256(
4412        &self,
4413        project_id: &str,
4414        scope_id: &str,
4415        table_name: &str,
4416        primary_key: Vec<Value>,
4417        column: &str,
4418        amount_be: [u8; 32],
4419        expected_seq: u64,
4420    ) -> Result<CommitResult, AedbError> {
4421        self.commit_envelope(TransactionEnvelope {
4422            caller: None,
4423            idempotency_key: None,
4424            write_class: crate::commit::tx::WriteClass::Standard,
4425            assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4426                project_id: project_id.to_string(),
4427                scope_id: scope_id.to_string(),
4428                table_name: table_name.to_string(),
4429                primary_key: primary_key.clone(),
4430                expected_seq,
4431            }],
4432            read_set: crate::commit::tx::ReadSet::default(),
4433            write_intent: crate::commit::tx::WriteIntent {
4434                mutations: vec![Mutation::TableDecU256 {
4435                    project_id: project_id.to_string(),
4436                    scope_id: scope_id.to_string(),
4437                    table_name: table_name.to_string(),
4438                    primary_key,
4439                    column: column.to_string(),
4440                    amount_be,
4441                }],
4442            },
4443            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4444        })
4445        .await
4446    }
4447
4448    #[allow(clippy::too_many_arguments)]
4449    pub async fn compare_and_dec_u256_as(
4450        &self,
4451        caller: CallerContext,
4452        project_id: &str,
4453        scope_id: &str,
4454        table_name: &str,
4455        primary_key: Vec<Value>,
4456        column: &str,
4457        amount_be: [u8; 32],
4458        expected_seq: u64,
4459    ) -> Result<CommitResult, AedbError> {
4460        self.commit_envelope(TransactionEnvelope {
4461            caller: Some(caller),
4462            idempotency_key: None,
4463            write_class: crate::commit::tx::WriteClass::Standard,
4464            assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4465                project_id: project_id.to_string(),
4466                scope_id: scope_id.to_string(),
4467                table_name: table_name.to_string(),
4468                primary_key: primary_key.clone(),
4469                expected_seq,
4470            }],
4471            read_set: crate::commit::tx::ReadSet::default(),
4472            write_intent: crate::commit::tx::WriteIntent {
4473                mutations: vec![Mutation::TableDecU256 {
4474                    project_id: project_id.to_string(),
4475                    scope_id: scope_id.to_string(),
4476                    table_name: table_name.to_string(),
4477                    primary_key,
4478                    column: column.to_string(),
4479                    amount_be,
4480                }],
4481            },
4482            base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4483        })
4484        .await
4485    }
4486
4487    async fn mutate_where_returning_inner(
4488        &self,
4489        caller: Option<CallerContext>,
4490        project_id: &str,
4491        scope_id: &str,
4492        table_name: &str,
4493        predicate: Expr,
4494        updates: Vec<(String, TableUpdateExpr)>,
4495    ) -> Result<Option<MutateWhereReturningResult>, AedbError> {
4496        for _ in 0..MAX_MUTATE_WHERE_RETURNING_RETRIES {
4497            let (_, catalog_snapshot, _) = self.executor.snapshot_state().await;
4498            let schema = catalog_snapshot
4499                .tables
4500                .get(&(namespace_key(project_id, scope_id), table_name.to_string()))
4501                .cloned()
4502                .ok_or_else(|| AedbError::NotFound {
4503                    resource_type: ErrorResourceType::Table,
4504                    resource_id: format!("{project_id}.{scope_id}.{table_name}"),
4505                })?;
4506
4507            let mut query = Query::select(&["*"])
4508                .from(table_name)
4509                .where_(predicate.clone())
4510                .limit(1);
4511            for pk in &schema.primary_key {
4512                query = query.order_by(pk, Order::Asc);
4513            }
4514            let query_result = self
4515                .query_with_options_as(
4516                    caller.as_ref(),
4517                    project_id,
4518                    scope_id,
4519                    query,
4520                    QueryOptions {
4521                        consistency: ConsistencyMode::AtLatest,
4522                        allow_full_scan: true,
4523                        ..QueryOptions::default()
4524                    },
4525                )
4526                .await
4527                .map_err(query_error_to_aedb)?;
4528
4529            let Some(before) = query_result.rows.first().cloned() else {
4530                return Ok(None);
4531            };
4532            let primary_key = extract_primary_key_values(&schema, &before)?;
4533            let snapshot_seq = query_result.snapshot_seq;
4534            let lease = self
4535                .acquire_snapshot(ConsistencyMode::AtSeq(snapshot_seq))
4536                .await?;
4537            let expected_seq = lease
4538                .view
4539                .keyspace
4540                .table(project_id, scope_id, table_name)
4541                .and_then(|table| {
4542                    table
4543                        .row_versions
4544                        .get(&crate::storage::encoded_key::EncodedKey::from_values(
4545                            &primary_key,
4546                        ))
4547                        .copied()
4548                })
4549                .unwrap_or(0);
4550            if expected_seq == 0 {
4551                continue;
4552            }
4553            let after = apply_table_update_exprs(&schema, &before, &updates)?;
4554
4555            let commit = self
4556                .commit_envelope(TransactionEnvelope {
4557                    caller: caller.clone(),
4558                    idempotency_key: None,
4559                    write_class: WriteClass::Standard,
4560                    assertions: vec![crate::commit::tx::ReadAssertion::RowVersion {
4561                        project_id: project_id.to_string(),
4562                        scope_id: scope_id.to_string(),
4563                        table_name: table_name.to_string(),
4564                        primary_key: primary_key.clone(),
4565                        expected_seq,
4566                    }],
4567                    read_set: ReadSet::default(),
4568                    write_intent: WriteIntent {
4569                        mutations: vec![Mutation::Upsert {
4570                            project_id: project_id.to_string(),
4571                            scope_id: scope_id.to_string(),
4572                            table_name: table_name.to_string(),
4573                            primary_key: primary_key.clone(),
4574                            row: after.clone(),
4575                        }],
4576                    },
4577                    base_seq: self.snapshot_probe(ConsistencyMode::AtLatest).await?,
4578                })
4579                .await;
4580
4581            match commit {
4582                Ok(commit) => {
4583                    return Ok(Some(MutateWhereReturningResult {
4584                        commit,
4585                        primary_key,
4586                        before,
4587                        after,
4588                    }));
4589                }
4590                Err(AedbError::AssertionFailed { .. }) => continue,
4591                Err(err) => return Err(err),
4592            }
4593        }
4594        Err(AedbError::Conflict(
4595            "mutate_where_returning exceeded retry budget".into(),
4596        ))
4597    }
4598
4599    async fn snapshot_for_consistency(
4600        &self,
4601        consistency: ConsistencyMode,
4602    ) -> Result<SnapshotReadView, AedbError> {
4603        match consistency {
4604            ConsistencyMode::AtLatest => {
4605                let (keyspace, catalog, seq) = self.executor.snapshot_state().await;
4606                Ok(SnapshotReadView {
4607                    keyspace: Arc::new(keyspace),
4608                    catalog: Arc::new(catalog),
4609                    seq,
4610                })
4611            }
4612            ConsistencyMode::AtSeq(requested) => {
4613                match self.executor.snapshot_at_seq(requested).await {
4614                    Ok(view) => Ok(view),
4615                    Err(e) => {
4616                        if let Some(view) = self.recovery_cache.lock().get(requested) {
4617                            return Ok(view);
4618                        }
4619                        if !should_fallback_to_recovery(&e) {
4620                            return Err(e);
4621                        }
4622                        warn!(
4623                            seq = requested,
4624                            "AtSeq falling back to disk recovery; version not in ring buffer"
4625                        );
4626                        let recovered =
4627                            recover_at_seq_with_config(&self.dir, requested, &self._config)?;
4628                        let view = SnapshotReadView {
4629                            keyspace: Arc::new(recovered.keyspace.snapshot()),
4630                            catalog: Arc::new(recovered.catalog.snapshot()),
4631                            seq: recovered.current_seq,
4632                        };
4633                        self.recovery_cache.lock().put(requested, view.clone());
4634                        Ok(view)
4635                    }
4636                }
4637            }
4638            ConsistencyMode::AtCheckpoint => {
4639                let manifest = crate::manifest::atomic::load_manifest_signed(
4640                    &self.dir,
4641                    self._config.hmac_key(),
4642                )?;
4643                let Some(cp) = manifest.checkpoints.last() else {
4644                    return Err(AedbError::Unavailable {
4645                        message: "no checkpoint available".into(),
4646                    });
4647                };
4648                match self.executor.snapshot_at_seq(cp.seq).await {
4649                    Ok(view) => Ok(view),
4650                    Err(e) => {
4651                        if let Some(view) = self.recovery_cache.lock().get(cp.seq) {
4652                            return Ok(view);
4653                        }
4654                        if !should_fallback_to_recovery(&e) {
4655                            return Err(e);
4656                        }
4657                        let recovered =
4658                            recover_at_seq_with_config(&self.dir, cp.seq, &self._config)?;
4659                        let view = SnapshotReadView {
4660                            keyspace: Arc::new(recovered.keyspace.snapshot()),
4661                            catalog: Arc::new(recovered.catalog.snapshot()),
4662                            seq: recovered.current_seq,
4663                        };
4664                        self.recovery_cache.lock().put(cp.seq, view.clone());
4665                        Ok(view)
4666                    }
4667                }
4668            }
4669        }
4670    }
4671
4672    async fn acquire_snapshot(
4673        &self,
4674        consistency: ConsistencyMode,
4675    ) -> Result<SnapshotLease, AedbError> {
4676        let view = self.snapshot_for_consistency(consistency).await?;
4677        let mut mgr = self.snapshot_manager.lock();
4678        let handle = mgr.acquire_bounded(view.clone(), self._config.max_concurrent_snapshots)?;
4679        let checked = mgr
4680            .get_checked(handle, self._config.max_snapshot_age_ms)?
4681            .clone();
4682        Ok(SnapshotLease {
4683            manager: Arc::clone(&self.snapshot_manager),
4684            handle,
4685            view: checked,
4686        })
4687    }
4688
4689    pub async fn preflight(&self, mutation: Mutation) -> PreflightResult {
4690        let (snapshot, catalog, _) = self.executor.snapshot_state().await;
4691        preflight(&snapshot, &catalog, &mutation)
4692    }
4693
4694    pub async fn preflight_plan(&self, mutation: Mutation) -> crate::commit::tx::PreflightPlan {
4695        let (snapshot, catalog, base_seq) = self.executor.snapshot_state().await;
4696        preflight_plan(&snapshot, &catalog, &mutation, base_seq)
4697    }
4698
4699    pub async fn preflight_as(
4700        &self,
4701        caller: &CallerContext,
4702        mutation: Mutation,
4703    ) -> Result<PreflightResult, AedbError> {
4704        ensure_external_caller_allowed(caller)?;
4705        let (snapshot, catalog, _) = self.executor.snapshot_state().await;
4706        validate_permissions(&catalog, Some(caller), &mutation)?;
4707        Ok(preflight(&snapshot, &catalog, &mutation))
4708    }
4709
4710    pub async fn preflight_plan_as(
4711        &self,
4712        caller: &CallerContext,
4713        mutation: Mutation,
4714    ) -> Result<crate::commit::tx::PreflightPlan, AedbError> {
4715        ensure_external_caller_allowed(caller)?;
4716        let (snapshot, catalog, base_seq) = self.executor.snapshot_state().await;
4717        validate_permissions(&catalog, Some(caller), &mutation)?;
4718        Ok(preflight_plan(&snapshot, &catalog, &mutation, base_seq))
4719    }
4720
4721    pub async fn commit_ddl(&self, op: DdlOperation) -> Result<DdlResult, AedbError> {
4722        let (_, catalog, _) = self.executor.snapshot_state().await;
4723        let applied = ddl_would_apply(&catalog, &op);
4724        let result = self.commit(Mutation::Ddl(op)).await?;
4725        Ok(DdlResult {
4726            applied,
4727            seq: result.commit_seq,
4728        })
4729    }
4730
4731    pub async fn commit_ddl_batch(
4732        &self,
4733        ddl_ops: Vec<DdlOperation>,
4734    ) -> Result<DdlBatchResult, AedbError> {
4735        self.commit_ddl_batch_with_mode(ddl_ops, DdlBatchOrderMode::AsProvided)
4736            .await
4737    }
4738
4739    pub async fn commit_ddl_batch_dependency_aware(
4740        &self,
4741        ddl_ops: Vec<DdlOperation>,
4742    ) -> Result<DdlBatchResult, AedbError> {
4743        self.commit_ddl_batch_with_mode(ddl_ops, DdlBatchOrderMode::DependencyAware)
4744            .await
4745    }
4746
4747    pub async fn commit_ddl_batch_with_mode(
4748        &self,
4749        ddl_ops: Vec<DdlOperation>,
4750        mode: DdlBatchOrderMode,
4751    ) -> Result<DdlBatchResult, AedbError> {
4752        if ddl_ops.is_empty() {
4753            return Err(AedbError::InvalidConfig {
4754                message: "ddl batch cannot be empty".into(),
4755            });
4756        }
4757        let ddl_ops = match mode {
4758            DdlBatchOrderMode::AsProvided => ddl_ops,
4759            DdlBatchOrderMode::DependencyAware => order_ddl_ops_for_batch(ddl_ops)?,
4760        };
4761        let (_, catalog, _) = self.executor.snapshot_state().await;
4762        let mut planned_catalog = catalog;
4763        let mut results = Vec::with_capacity(ddl_ops.len());
4764        for op in &ddl_ops {
4765            let applied = ddl_would_apply(&planned_catalog, op);
4766            planned_catalog.apply_ddl(op.clone())?;
4767            results.push(DdlResult { applied, seq: 0 });
4768        }
4769        let mutations = ddl_ops.into_iter().map(Mutation::Ddl).collect();
4770        let base_seq = self.executor.current_seq().await;
4771        let committed = self
4772            .commit_envelope(TransactionEnvelope {
4773                caller: None,
4774                idempotency_key: None,
4775                write_class: crate::commit::tx::WriteClass::Standard,
4776                assertions: Vec::new(),
4777                read_set: crate::commit::tx::ReadSet::default(),
4778                write_intent: crate::commit::tx::WriteIntent { mutations },
4779                base_seq,
4780            })
4781            .await?;
4782        for result in &mut results {
4783            result.seq = committed.commit_seq;
4784        }
4785        Ok(DdlBatchResult {
4786            seq: committed.commit_seq,
4787            results,
4788        })
4789    }
4790
4791    pub fn add_lifecycle_hook(&self, hook: Arc<dyn LifecycleHook>) {
4792        self.lifecycle_hooks.lock().push(hook);
4793    }
4794
4795    pub fn remove_lifecycle_hook(&self, hook: &Arc<dyn LifecycleHook>) {
4796        let mut hooks = self.lifecycle_hooks.lock();
4797        hooks.retain(|existing| !Arc::ptr_eq(existing, hook));
4798    }
4799
4800    pub fn add_telemetry_hook(&self, hook: Arc<dyn QueryCommitTelemetryHook>) {
4801        self.telemetry_hooks.lock().push(hook);
4802    }
4803
4804    pub fn remove_telemetry_hook(&self, hook: &Arc<dyn QueryCommitTelemetryHook>) {
4805        let mut hooks = self.telemetry_hooks.lock();
4806        hooks.retain(|existing| !Arc::ptr_eq(existing, hook));
4807    }
4808
4809    pub async fn backup_full_to_remote(
4810        &self,
4811        adapter: &dyn RemoteBackupAdapter,
4812        uri: &str,
4813    ) -> Result<BackupManifest, AedbError> {
4814        let temp = tempfile::tempdir()?;
4815        let manifest = self.backup_full(temp.path()).await?;
4816        adapter.store_backup_dir(uri, temp.path())?;
4817        Ok(manifest)
4818    }
4819
4820    pub fn restore_from_remote(
4821        adapter: &dyn RemoteBackupAdapter,
4822        uri: &str,
4823        data_dir: &Path,
4824        config: &AedbConfig,
4825    ) -> Result<u64, AedbError> {
4826        let temp = tempfile::tempdir()?;
4827        let chain = adapter.materialize_backup_chain(uri, temp.path())?;
4828        Self::restore_from_backup_chain(&chain, data_dir, config, None)
4829    }
4830
4831    pub async fn create_project(&self, project_id: &str) -> Result<(), AedbError> {
4832        self.commit_ddl(DdlOperation::CreateProject {
4833            owner_id: None,
4834            project_id: project_id.to_string(),
4835            if_not_exists: true,
4836        })
4837        .await?;
4838        Ok(())
4839    }
4840
4841    pub async fn drop_project(&self, project_id: &str) -> Result<(), AedbError> {
4842        self.commit_ddl(DdlOperation::DropProject {
4843            project_id: project_id.to_string(),
4844            if_exists: true,
4845        })
4846        .await?;
4847        Ok(())
4848    }
4849
4850    pub async fn create_scope(&self, project_id: &str, scope_id: &str) -> Result<(), AedbError> {
4851        self.commit_ddl(DdlOperation::CreateScope {
4852            owner_id: None,
4853            project_id: project_id.to_string(),
4854            scope_id: scope_id.to_string(),
4855            if_not_exists: true,
4856        })
4857        .await?;
4858        Ok(())
4859    }
4860
4861    pub async fn drop_scope(&self, project_id: &str, scope_id: &str) -> Result<(), AedbError> {
4862        self.commit_ddl(DdlOperation::DropScope {
4863            project_id: project_id.to_string(),
4864            scope_id: scope_id.to_string(),
4865            if_exists: true,
4866        })
4867        .await?;
4868        Ok(())
4869    }
4870
4871    pub async fn enable_kv_projection(
4872        &self,
4873        project_id: &str,
4874        scope_id: &str,
4875    ) -> Result<(), AedbError> {
4876        self.commit(Mutation::Ddl(DdlOperation::EnableKvProjection {
4877            project_id: project_id.to_string(),
4878            scope_id: scope_id.to_string(),
4879        }))
4880        .await?;
4881        Ok(())
4882    }
4883
4884    pub async fn disable_kv_projection(
4885        &self,
4886        project_id: &str,
4887        scope_id: &str,
4888    ) -> Result<(), AedbError> {
4889        self.commit(Mutation::Ddl(DdlOperation::DisableKvProjection {
4890            project_id: project_id.to_string(),
4891            scope_id: scope_id.to_string(),
4892        }))
4893        .await?;
4894        Ok(())
4895    }
4896
4897    pub async fn list_scopes(&self, project_id: &str) -> Result<Vec<String>, AedbError> {
4898        let (_, catalog, _) = self.executor.snapshot_state().await;
4899        Ok(catalog.list_scopes(project_id))
4900    }
4901
4902    pub async fn project_exists(&self, project_id: &str) -> Result<bool, AedbError> {
4903        let (_, catalog, _) = self.executor.snapshot_state().await;
4904        Ok(catalog.projects.contains_key(project_id))
4905    }
4906
4907    pub async fn scope_exists(&self, project_id: &str, scope_id: &str) -> Result<bool, AedbError> {
4908        let (_, catalog, _) = self.executor.snapshot_state().await;
4909        Ok(catalog
4910            .scopes
4911            .contains_key(&(project_id.to_string(), scope_id.to_string())))
4912    }
4913
4914    pub async fn table_exists(
4915        &self,
4916        project_id: &str,
4917        scope_id: &str,
4918        table_name: &str,
4919    ) -> Result<bool, AedbError> {
4920        let (_, catalog, _) = self.executor.snapshot_state().await;
4921        Ok(catalog
4922            .tables
4923            .contains_key(&(namespace_key(project_id, scope_id), table_name.to_string())))
4924    }
4925
4926    pub async fn index_exists(
4927        &self,
4928        project_id: &str,
4929        scope_id: &str,
4930        table_name: &str,
4931        index_name: &str,
4932    ) -> Result<bool, AedbError> {
4933        let (_, catalog, _) = self.executor.snapshot_state().await;
4934        Ok(catalog.indexes.contains_key(&(
4935            namespace_key(project_id, scope_id),
4936            table_name.to_string(),
4937            index_name.to_string(),
4938        )))
4939    }
4940
4941    pub async fn list_projects(&self) -> Result<Vec<ProjectInfo>, AedbError> {
4942        let (_, catalog, _) = self.executor.snapshot_state().await;
4943        let mut infos: Vec<ProjectInfo> = catalog
4944            .projects
4945            .values()
4946            .map(|project| {
4947                let scope_count = catalog
4948                    .scopes
4949                    .values()
4950                    .filter(|scope| scope.project_id == project.project_id)
4951                    .count() as u32;
4952                ProjectInfo {
4953                    project_id: project.project_id.clone(),
4954                    scope_count,
4955                    created_at_micros: project.created_at_micros,
4956                }
4957            })
4958            .collect();
4959        infos.sort_by(|a, b| a.project_id.cmp(&b.project_id));
4960        Ok(infos)
4961    }
4962
4963    pub async fn list_scopes_info(&self, project_id: &str) -> Result<Vec<ScopeInfo>, AedbError> {
4964        let (snapshot, catalog, _) = self.executor.snapshot_state().await;
4965        let mut infos: Vec<ScopeInfo> = catalog
4966            .scopes
4967            .values()
4968            .filter(|scope| scope.project_id == project_id)
4969            .map(|scope| {
4970                let table_count = catalog
4971                    .tables
4972                    .values()
4973                    .filter(|table| {
4974                        table.project_id == scope.project_id && table.scope_id == scope.scope_id
4975                    })
4976                    .count() as u32;
4977                let kv_key_count = snapshot
4978                    .namespaces
4979                    .get(&NamespaceId::project_scope(project_id, &scope.scope_id))
4980                    .map_or(0, |ns| ns.kv.entries.len() as u64);
4981                ScopeInfo {
4982                    scope_id: scope.scope_id.clone(),
4983                    table_count,
4984                    kv_key_count,
4985                    created_at_micros: scope.created_at_micros,
4986                }
4987            })
4988            .collect();
4989        infos.sort_by(|a, b| a.scope_id.cmp(&b.scope_id));
4990        Ok(infos)
4991    }
4992
4993    pub async fn list_tables_info(
4994        &self,
4995        project_id: &str,
4996        scope_id: &str,
4997    ) -> Result<Vec<TableInfo>, AedbError> {
4998        let (snapshot, catalog, _) = self.executor.snapshot_state().await;
4999        let ns_key = namespace_key(project_id, scope_id);
5000        let ns_id = NamespaceId::project_scope(project_id, scope_id);
5001        let mut infos: Vec<TableInfo> = catalog
5002            .tables
5003            .values()
5004            .filter(|table| table.project_id == project_id && table.scope_id == scope_id)
5005            .map(|table| {
5006                let index_count = catalog
5007                    .indexes
5008                    .keys()
5009                    .filter(|(ns, name, _)| ns == &ns_key && name == &table.table_name)
5010                    .count() as u32;
5011                let row_count = snapshot
5012                    .namespaces
5013                    .get(&ns_id)
5014                    .and_then(|ns| ns.tables.get(&table.table_name))
5015                    .map_or(0, |t| t.rows.len() as u64);
5016                TableInfo {
5017                    table_name: table.table_name.clone(),
5018                    column_count: table.columns.len() as u32,
5019                    index_count,
5020                    row_count,
5021                }
5022            })
5023            .collect();
5024        infos.sort_by(|a, b| a.table_name.cmp(&b.table_name));
5025        Ok(infos)
5026    }
5027
5028    pub async fn set_read_policy(
5029        &self,
5030        project_id: &str,
5031        scope_id: &str,
5032        table_name: &str,
5033        predicate: Expr,
5034    ) -> Result<(), AedbError> {
5035        self.commit(Mutation::Ddl(DdlOperation::SetReadPolicy {
5036            project_id: project_id.to_string(),
5037            scope_id: scope_id.to_string(),
5038            table_name: table_name.to_string(),
5039            predicate,
5040            actor_id: None,
5041        }))
5042        .await?;
5043        Ok(())
5044    }
5045
5046    pub async fn clear_read_policy(
5047        &self,
5048        project_id: &str,
5049        scope_id: &str,
5050        table_name: &str,
5051    ) -> Result<(), AedbError> {
5052        self.commit(Mutation::Ddl(DdlOperation::ClearReadPolicy {
5053            project_id: project_id.to_string(),
5054            scope_id: scope_id.to_string(),
5055            table_name: table_name.to_string(),
5056            actor_id: None,
5057        }))
5058        .await?;
5059        Ok(())
5060    }
5061
5062    pub async fn transfer_ownership(
5063        &self,
5064        resource_type: ResourceType,
5065        project_id: &str,
5066        scope_id: Option<&str>,
5067        table_name: Option<&str>,
5068        new_owner_id: &str,
5069    ) -> Result<CommitResult, AedbError> {
5070        self.commit(Mutation::Ddl(DdlOperation::TransferOwnership {
5071            resource_type,
5072            project_id: project_id.to_string(),
5073            scope_id: scope_id.map(ToString::to_string),
5074            table_name: table_name.map(ToString::to_string),
5075            new_owner_id: new_owner_id.to_string(),
5076            actor_id: None,
5077        }))
5078        .await
5079    }
5080
5081    pub async fn list_tables(
5082        &self,
5083        project_id: &str,
5084        scope_id: &str,
5085    ) -> Result<Vec<TableSchema>, AedbError> {
5086        let (_, catalog, _) = self.executor.snapshot_state().await;
5087        Ok(catalog
5088            .tables
5089            .values()
5090            .filter(|t| t.project_id == project_id && t.scope_id == scope_id)
5091            .cloned()
5092            .collect())
5093    }
5094
5095    pub async fn describe_table(
5096        &self,
5097        project_id: &str,
5098        scope_id: &str,
5099        table: &str,
5100    ) -> Result<TableSchema, AedbError> {
5101        let (_, catalog, _) = self.executor.snapshot_state().await;
5102        catalog
5103            .tables
5104            .get(&(namespace_key(project_id, scope_id), table.to_string()))
5105            .cloned()
5106            .ok_or_else(|| AedbError::NotFound {
5107                resource_type: ErrorResourceType::Table,
5108                resource_id: format!("{project_id}.{scope_id}.{table}"),
5109            })
5110    }
5111
5112    pub async fn list_indexes(
5113        &self,
5114        project_id: &str,
5115        scope_id: &str,
5116        table_name: &str,
5117    ) -> Result<Vec<IndexDef>, AedbError> {
5118        let (_, catalog, _) = self.executor.snapshot_state().await;
5119        let mut out: Vec<IndexDef> = catalog
5120            .indexes
5121            .values()
5122            .filter(|idx| {
5123                idx.project_id == project_id
5124                    && idx.scope_id == scope_id
5125                    && idx.table_name == table_name
5126            })
5127            .cloned()
5128            .collect();
5129        out.sort_by(|a, b| a.index_name.cmp(&b.index_name));
5130        Ok(out)
5131    }
5132
5133    pub async fn describe_index(
5134        &self,
5135        project_id: &str,
5136        scope_id: &str,
5137        table_name: &str,
5138        index_name: &str,
5139    ) -> Result<IndexDef, AedbError> {
5140        let (_, catalog, _) = self.executor.snapshot_state().await;
5141        catalog
5142            .indexes
5143            .get(&(
5144                namespace_key(project_id, scope_id),
5145                table_name.to_string(),
5146                index_name.to_string(),
5147            ))
5148            .cloned()
5149            .ok_or_else(|| AedbError::NotFound {
5150                resource_type: ErrorResourceType::Index,
5151                resource_id: format!("{project_id}.{scope_id}.{table_name}.{index_name}"),
5152            })
5153    }
5154
5155    pub async fn list_async_indexes(
5156        &self,
5157        project_id: &str,
5158        scope_id: &str,
5159        table_name: &str,
5160    ) -> Result<Vec<AsyncIndexDef>, AedbError> {
5161        let (_, catalog, _) = self.executor.snapshot_state().await;
5162        let mut out: Vec<AsyncIndexDef> = catalog
5163            .async_indexes
5164            .values()
5165            .filter(|idx| {
5166                idx.project_id == project_id
5167                    && idx.scope_id == scope_id
5168                    && idx.table_name == table_name
5169            })
5170            .cloned()
5171            .collect();
5172        out.sort_by(|a, b| a.index_name.cmp(&b.index_name));
5173        Ok(out)
5174    }
5175
5176    pub async fn describe_async_index(
5177        &self,
5178        project_id: &str,
5179        scope_id: &str,
5180        table_name: &str,
5181        index_name: &str,
5182    ) -> Result<AsyncIndexDef, AedbError> {
5183        let (_, catalog, _) = self.executor.snapshot_state().await;
5184        catalog
5185            .async_indexes
5186            .get(&(
5187                namespace_key(project_id, scope_id),
5188                table_name.to_string(),
5189                index_name.to_string(),
5190            ))
5191            .cloned()
5192            .ok_or_else(|| AedbError::NotFound {
5193                resource_type: ErrorResourceType::Index,
5194                resource_id: format!("{project_id}.{scope_id}.{table_name}.{index_name}"),
5195            })
5196    }
5197
5198    pub async fn shutdown(&self) -> Result<(), AedbError> {
5199        // Ensure batch durability tails are flushed before checkpointing shutdown state.
5200        let _ = self.force_fsync().await?;
5201        let _ = self.checkpoint_now().await?;
5202        Ok(())
5203    }
5204
5205    pub async fn checkpoint_now(&self) -> Result<u64, AedbError> {
5206        // Serialize checkpoints while allowing normal commits/queries to continue.
5207        let _checkpoint_guard = self.checkpoint_lock.lock().await;
5208
5209        // In batch durability mode, flush un-synced WAL tail so checkpoint captures a
5210        // stable durable horizon and recovery does not lose committed tail entries.
5211        let _ = self.force_fsync().await?;
5212
5213        // Anchor checkpoint to a stable committed horizon.
5214        let seq = self.executor.durable_head_seq_now();
5215        let lease = self.acquire_snapshot(ConsistencyMode::AtSeq(seq)).await?;
5216        let snapshot = lease.view.keyspace.as_ref();
5217        let catalog = lease.view.catalog.as_ref();
5218        let mut idempotency = self.executor.idempotency_snapshot().await;
5219        idempotency.retain(|_, record| record.commit_seq <= seq);
5220        let checkpoint = write_checkpoint_with_key(
5221            snapshot,
5222            catalog,
5223            seq,
5224            &self.dir,
5225            self._config.checkpoint_key(),
5226            self._config.checkpoint_key_id.clone(),
5227            idempotency,
5228            self._config.checkpoint_compression_level,
5229        )?;
5230
5231        let segments = read_segments_for_checkpoint(&self.dir, seq)?;
5232        let active_segment_seq = segments
5233            .last()
5234            .map(|segment| segment.segment_seq)
5235            .unwrap_or(seq.saturating_add(1));
5236        let manifest = Manifest {
5237            durable_seq: seq,
5238            visible_seq: seq,
5239            active_segment_seq,
5240            checkpoints: vec![checkpoint],
5241            segments,
5242        };
5243        write_manifest_atomic_signed(&manifest, &self.dir, self._config.hmac_key())?;
5244        Ok(seq)
5245    }
5246
5247    pub async fn backup_full(&self, backup_dir: &Path) -> Result<BackupManifest, AedbError> {
5248        create_private_dir_all(backup_dir)?;
5249        create_private_dir_all(&backup_dir.join("wal_tail"))?;
5250
5251        // Hot backup: pin a consistent read view without blocking the write path.
5252        let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
5253        let snapshot_seq = lease.view.seq;
5254        let idempotency = self.executor.idempotency_snapshot().await;
5255        let checkpoint = write_checkpoint_with_key(
5256            lease.view.keyspace.as_ref(),
5257            lease.view.catalog.as_ref(),
5258            snapshot_seq,
5259            backup_dir,
5260            self._config.checkpoint_key(),
5261            self._config.checkpoint_key_id.clone(),
5262            idempotency,
5263            self._config.checkpoint_compression_level,
5264        )?;
5265
5266        let mut wal_segments = Vec::new();
5267        let mut file_sha256 = HashMap::new();
5268        file_sha256.insert(
5269            checkpoint.filename.clone(),
5270            sha256_file_hex(&backup_dir.join(&checkpoint.filename))?,
5271        );
5272
5273        for segment in read_segments(&self.dir)? {
5274            let src = self.dir.join(&segment.filename);
5275            let rel = format!("wal_tail/{}", segment.filename);
5276            let dst = backup_dir.join(&rel);
5277            fs::copy(src, &dst)?;
5278            wal_segments.push(segment.filename);
5279            file_sha256.insert(rel, sha256_file_hex(&dst)?);
5280        }
5281
5282        wal_segments.sort_by_key(|name| segment_seq_from_name(name).unwrap_or(0));
5283        let created_at_micros = std::time::SystemTime::now()
5284            .duration_since(std::time::UNIX_EPOCH)
5285            .unwrap_or_default()
5286            .as_micros() as u64;
5287        let manifest = BackupManifest {
5288            backup_id: format!("bk_{}_{}", created_at_micros, snapshot_seq),
5289            backup_type: "full".into(),
5290            parent_backup_id: None,
5291            from_seq: None,
5292            created_at_micros,
5293            aedb_version: env!("CARGO_PKG_VERSION").to_string(),
5294            checkpoint_seq: snapshot_seq,
5295            wal_head_seq: snapshot_seq,
5296            checkpoint_file: checkpoint.filename,
5297            wal_segments,
5298            file_sha256,
5299        };
5300        write_backup_manifest(backup_dir, &manifest, self._config.hmac_key())?;
5301        Ok(manifest)
5302    }
5303
5304    /// Creates a full backup and packs it into a single archive file.
5305    ///
5306    /// The archive is encrypted when `checkpoint_encryption_key` is configured.
5307    pub async fn backup_full_to_file(
5308        &self,
5309        backup_file: &Path,
5310    ) -> Result<BackupManifest, AedbError> {
5311        let temp = tempfile::tempdir()?;
5312        let manifest = self.backup_full(temp.path()).await?;
5313        write_backup_archive(temp.path(), backup_file, self._config.checkpoint_key())?;
5314        Ok(manifest)
5315    }
5316
5317    pub async fn backup_incremental(
5318        &self,
5319        backup_dir: &Path,
5320        parent_backup_dir: &Path,
5321    ) -> Result<BackupManifest, AedbError> {
5322        create_private_dir_all(backup_dir)?;
5323        create_private_dir_all(&backup_dir.join("wal_tail"))?;
5324        let parent = load_backup_manifest(parent_backup_dir, self._config.hmac_key())?;
5325        verify_backup_files(parent_backup_dir, &parent)?;
5326        let from_seq = parent.wal_head_seq.saturating_add(1);
5327
5328        let lease = self.acquire_snapshot(ConsistencyMode::AtLatest).await?;
5329        let to_seq = lease.view.seq;
5330        let created_at_micros = std::time::SystemTime::now()
5331            .duration_since(std::time::UNIX_EPOCH)
5332            .unwrap_or_default()
5333            .as_micros() as u64;
5334        let mut wal_segments = Vec::new();
5335        let mut file_sha256 = HashMap::new();
5336
5337        for segment in read_segments(&self.dir)? {
5338            let src = self.dir.join(&segment.filename);
5339            let Some((min_seq, max_seq)) = scan_segment_seq_range(&src)? else {
5340                continue;
5341            };
5342            if max_seq < from_seq || min_seq > to_seq {
5343                continue;
5344            }
5345            let rel = format!("wal_tail/{}", segment.filename);
5346            let dst = backup_dir.join(&rel);
5347            fs::copy(src, &dst)?;
5348            wal_segments.push(segment.filename);
5349            file_sha256.insert(rel, sha256_file_hex(&dst)?);
5350        }
5351
5352        wal_segments.sort_by_key(|name| segment_seq_from_name(name).unwrap_or(0));
5353        let manifest = BackupManifest {
5354            backup_id: format!("bk_{}_{}", created_at_micros, to_seq),
5355            backup_type: "incremental".into(),
5356            parent_backup_id: Some(parent.backup_id),
5357            from_seq: Some(from_seq),
5358            created_at_micros,
5359            aedb_version: env!("CARGO_PKG_VERSION").to_string(),
5360            checkpoint_seq: parent.checkpoint_seq,
5361            wal_head_seq: to_seq,
5362            checkpoint_file: parent.checkpoint_file,
5363            wal_segments,
5364            file_sha256,
5365        };
5366        write_backup_manifest(backup_dir, &manifest, self._config.hmac_key())?;
5367        Ok(manifest)
5368    }
5369
5370    pub fn restore_from_backup(
5371        backup_dir: &Path,
5372        data_dir: &Path,
5373        config: &AedbConfig,
5374    ) -> Result<u64, AedbError> {
5375        Self::restore_from_backup_chain(&[backup_dir.to_path_buf()], data_dir, config, None)
5376    }
5377
5378    /// Restores from a single-file full backup archive created by `backup_full_to_file`.
5379    pub fn restore_from_backup_file(
5380        backup_file: &Path,
5381        data_dir: &Path,
5382        config: &AedbConfig,
5383    ) -> Result<u64, AedbError> {
5384        let temp = tempfile::tempdir()?;
5385        extract_backup_archive(backup_file, temp.path(), config.checkpoint_key())?;
5386        Self::restore_from_backup(temp.path(), data_dir, config)
5387    }
5388
5389    pub fn restore_from_backup_chain(
5390        backup_dirs: &[PathBuf],
5391        data_dir: &Path,
5392        config: &AedbConfig,
5393        target_seq: Option<u64>,
5394    ) -> Result<u64, AedbError> {
5395        let chain = load_verified_backup_chain(backup_dirs, config)?;
5396        if data_dir.exists() && fs::read_dir(data_dir)?.next().is_some() {
5397            return Err(AedbError::Validation(
5398                "restore target directory must be empty".into(),
5399            ));
5400        }
5401        create_private_dir_all(data_dir)?;
5402        let effective_target =
5403            target_seq.unwrap_or(chain.last().expect("non-empty").1.wal_head_seq);
5404        let full = &chain[0].1;
5405        if effective_target < full.checkpoint_seq {
5406            return Err(AedbError::Validation(
5407                "target_seq is older than full checkpoint_seq".into(),
5408            ));
5409        }
5410
5411        let checkpoint_path = resolve_backup_path(&chain[0].0, &full.checkpoint_file)?;
5412        let (mut keyspace, mut catalog, checkpoint_seq, mut idempotency) =
5413            load_checkpoint_with_key(&checkpoint_path, config.checkpoint_key())?;
5414        let mut current_seq = checkpoint_seq;
5415        let mut last_verified_chain: Option<(u64, [u8; 32])> = None;
5416
5417        for (dir, manifest) in &chain {
5418            let replay_to = effective_target.min(manifest.wal_head_seq);
5419            if replay_to <= current_seq {
5420                continue;
5421            }
5422            let mut wal_paths = Vec::new();
5423            for seg in &manifest.wal_segments {
5424                wal_paths.push(resolve_backup_path(dir, &format!("wal_tail/{seg}"))?);
5425            }
5426            wal_paths.sort_by_key(|p| {
5427                p.file_name()
5428                    .and_then(|n| segment_seq_from_name(&n.to_string_lossy()))
5429                    .unwrap_or(0)
5430            });
5431            if config.hash_chain_required && !wal_paths.is_empty() {
5432                let first_seq = wal_paths
5433                    .first()
5434                    .and_then(|p| p.file_name())
5435                    .and_then(|n| segment_seq_from_name(&n.to_string_lossy()));
5436                let chain_anchor = match (last_verified_chain, first_seq) {
5437                    (Some((last_seq, last_hash)), Some(first_seq)) if first_seq > last_seq => {
5438                        Some(last_hash)
5439                    }
5440                    _ => None,
5441                };
5442                if let Some(last) = verify_hash_chain_batch(&wal_paths, chain_anchor)? {
5443                    last_verified_chain = Some(last);
5444                }
5445            }
5446            current_seq = replay_segments(
5447                &wal_paths,
5448                current_seq,
5449                Some(replay_to),
5450                None,
5451                false,
5452                config.strict_recovery(),
5453                &mut keyspace,
5454                &mut catalog,
5455                &mut idempotency,
5456            )?;
5457            if current_seq >= effective_target {
5458                break;
5459            }
5460        }
5461        let restored_seq = current_seq;
5462        let cp = write_checkpoint_with_key(
5463            &keyspace.snapshot(),
5464            &catalog,
5465            restored_seq,
5466            data_dir,
5467            config.checkpoint_key(),
5468            config.checkpoint_key_id.clone(),
5469            idempotency,
5470            config.checkpoint_compression_level,
5471        )?;
5472        let restored_manifest = Manifest {
5473            durable_seq: restored_seq,
5474            visible_seq: restored_seq,
5475            active_segment_seq: restored_seq + 1,
5476            checkpoints: vec![cp],
5477            segments: Vec::new(),
5478        };
5479        write_manifest_atomic_signed(&restored_manifest, data_dir, config.hmac_key())?;
5480        Ok(restored_seq)
5481    }
5482
5483    pub fn restore_from_backup_chain_at_time(
5484        backup_dirs: &[PathBuf],
5485        data_dir: &Path,
5486        config: &AedbConfig,
5487        target_time_micros: u64,
5488    ) -> Result<u64, AedbError> {
5489        let chain = load_verified_backup_chain(backup_dirs, config)?;
5490        let target_seq = resolve_target_seq_for_time(&chain, target_time_micros)?;
5491        Self::restore_from_backup_chain(backup_dirs, data_dir, config, Some(target_seq))
5492    }
5493
5494    pub fn restore_namespace_from_backup_chain(
5495        backup_dirs: &[PathBuf],
5496        data_dir: &Path,
5497        config: &AedbConfig,
5498        project_id: &str,
5499        scope_id: &str,
5500        target_seq: Option<u64>,
5501    ) -> Result<u64, AedbError> {
5502        let chain = load_verified_backup_chain(backup_dirs, config)?;
5503        let effective_target =
5504            target_seq.unwrap_or(chain.last().expect("non-empty").1.wal_head_seq);
5505        let live = recover_with_config(data_dir, config)?;
5506
5507        let temp = tempfile::tempdir()?;
5508        let _ = Self::restore_from_backup_chain(
5509            backup_dirs,
5510            temp.path(),
5511            config,
5512            Some(effective_target),
5513        )?;
5514        let restored = recover_with_config(temp.path(), config)?;
5515
5516        let ns_key = namespace_key(project_id, scope_id);
5517        let ns_id = NamespaceId::project_scope(project_id, scope_id);
5518
5519        let mut merged_namespaces = live
5520            .keyspace
5521            .namespaces
5522            .iter()
5523            .filter(|(ns, _)| **ns != ns_id)
5524            .map(|(ns, namespace)| (ns.clone(), namespace.clone()))
5525            .collect::<HashMap<_, _>>();
5526        if let Some(namespace) = restored.keyspace.namespaces.get(&ns_id) {
5527            merged_namespaces.insert(ns_id.clone(), namespace.clone());
5528        }
5529        let mut merged_async_indexes = live
5530            .keyspace
5531            .async_indexes
5532            .iter()
5533            .filter(|(key, _)| key.0 != ns_id)
5534            .map(|(key, value)| (key.clone(), value.clone()))
5535            .collect::<HashMap<_, _>>();
5536        for (key, value) in restored.keyspace.async_indexes.iter() {
5537            if key.0 == ns_id {
5538                merged_async_indexes.insert(key.clone(), value.clone());
5539            }
5540        }
5541        let merged_keyspace = Keyspace {
5542            primary_index_backend: live.keyspace.primary_index_backend,
5543            namespaces: Arc::new(merged_namespaces.into()),
5544            async_indexes: Arc::new(merged_async_indexes.into()),
5545        };
5546
5547        let mut merged_catalog = live.catalog.clone();
5548        if let Some(project) = restored.catalog.projects.get(project_id) {
5549            merged_catalog
5550                .projects
5551                .insert(project_id.to_string(), project.clone());
5552        }
5553        let scope_key = (project_id.to_string(), scope_id.to_string());
5554        match restored.catalog.scopes.get(&scope_key) {
5555            Some(scope) => {
5556                merged_catalog
5557                    .scopes
5558                    .insert(scope_key.clone(), scope.clone());
5559            }
5560            None => {
5561                merged_catalog.scopes.remove(&scope_key);
5562            }
5563        }
5564        let table_keys: Vec<(String, String)> = merged_catalog
5565            .tables
5566            .keys()
5567            .filter(|(ns, _)| ns == &ns_key)
5568            .cloned()
5569            .collect();
5570        for key in table_keys {
5571            merged_catalog.tables.remove(&key);
5572        }
5573        for (key, table) in restored.catalog.tables.iter() {
5574            if key.0 == ns_key {
5575                merged_catalog.tables.insert(key.clone(), table.clone());
5576            }
5577        }
5578
5579        let index_keys: Vec<(String, String, String)> = merged_catalog
5580            .indexes
5581            .keys()
5582            .filter(|(ns, _, _)| ns == &ns_key)
5583            .cloned()
5584            .collect();
5585        for key in index_keys {
5586            merged_catalog.indexes.remove(&key);
5587        }
5588        for (key, index) in restored.catalog.indexes.iter() {
5589            if key.0 == ns_key {
5590                merged_catalog.indexes.insert(key.clone(), index.clone());
5591            }
5592        }
5593
5594        let async_index_keys: Vec<(String, String, String)> = merged_catalog
5595            .async_indexes
5596            .keys()
5597            .filter(|(ns, _, _)| ns == &ns_key)
5598            .cloned()
5599            .collect();
5600        for key in async_index_keys {
5601            merged_catalog.async_indexes.remove(&key);
5602        }
5603        for (key, index) in restored.catalog.async_indexes.iter() {
5604            if key.0 == ns_key {
5605                merged_catalog
5606                    .async_indexes
5607                    .insert(key.clone(), index.clone());
5608            }
5609        }
5610
5611        let merged_seq = live.current_seq.max(effective_target);
5612        let cp = write_checkpoint_with_key(
5613            &merged_keyspace.snapshot(),
5614            &merged_catalog,
5615            merged_seq,
5616            data_dir,
5617            config.checkpoint_key(),
5618            config.checkpoint_key_id.clone(),
5619            live.idempotency,
5620            config.checkpoint_compression_level,
5621        )?;
5622        let manifest = Manifest {
5623            durable_seq: merged_seq,
5624            visible_seq: merged_seq,
5625            active_segment_seq: merged_seq + 1,
5626            checkpoints: vec![cp],
5627            segments: Vec::new(),
5628        };
5629        write_manifest_atomic_signed(&manifest, data_dir, config.hmac_key())?;
5630        Ok(merged_seq)
5631    }
5632
5633    pub async fn snapshot_probe(&self, consistency: ConsistencyMode) -> Result<u64, AedbError> {
5634        let lease = self.acquire_snapshot(consistency).await?;
5635        Ok(lease.view.seq)
5636    }
5637
5638    pub fn metrics(&self) -> ExecutorMetrics {
5639        self.executor.metrics()
5640    }
5641
5642    pub async fn operational_metrics(&self) -> OperationalMetrics {
5643        let core = self.executor.metrics();
5644        let runtime = self.executor.runtime_state_metrics().await;
5645        let now_micros = std::time::SystemTime::now()
5646            .duration_since(std::time::UNIX_EPOCH)
5647            .unwrap_or_default()
5648            .as_micros() as u64;
5649        let snapshot_age_micros = now_micros.saturating_sub(runtime.last_full_snapshot_micros);
5650        let conflict_rate = if core.commits_total == 0 {
5651            0.0
5652        } else {
5653            core.conflict_rejections as f64 / core.commits_total as f64
5654        };
5655        let durable_wait_ops = self.durable_wait_ops.load(Ordering::Relaxed);
5656        let durable_wait_micros = self.durable_wait_micros.load(Ordering::Relaxed);
5657        let avg_durable_wait_micros = if durable_wait_ops == 0 {
5658            0
5659        } else {
5660            durable_wait_micros / durable_wait_ops
5661        };
5662        let upstream_validation_rejections =
5663            self.upstream_validation_rejections.load(Ordering::Relaxed);
5664        OperationalMetrics {
5665            commits_total: core.commits_total,
5666            commit_errors: core.commit_errors,
5667            permission_rejections: core.permission_rejections,
5668            validation_rejections: core
5669                .validation_rejections
5670                .saturating_add(upstream_validation_rejections),
5671            queue_full_rejections: core.queue_full_rejections,
5672            timeout_rejections: core.timeout_rejections,
5673            conflict_rejections: core.conflict_rejections,
5674            read_set_conflicts: core.read_set_conflicts,
5675            conflict_rate,
5676            avg_commit_latency_micros: core.avg_commit_latency_micros,
5677            coordinator_apply_attempts: core.coordinator_apply_attempts,
5678            avg_coordinator_apply_micros: core.avg_coordinator_apply_micros,
5679            wal_append_ops: core.wal_append_ops,
5680            wal_append_bytes: core.wal_append_bytes,
5681            avg_wal_append_micros: core.avg_wal_append_micros,
5682            wal_sync_ops: core.wal_sync_ops,
5683            avg_wal_sync_micros: core.avg_wal_sync_micros,
5684            prestage_validate_ops: core.prestage_validate_ops,
5685            avg_prestage_validate_micros: core.avg_prestage_validate_micros,
5686            epoch_process_ops: core.epoch_process_ops,
5687            avg_epoch_process_micros: core.avg_epoch_process_micros,
5688            durable_wait_ops,
5689            avg_durable_wait_micros,
5690            inflight_commits: core.inflight_commits,
5691            queue_depth: core.queued_commits,
5692            durable_head_lag: runtime
5693                .visible_head_seq
5694                .saturating_sub(runtime.durable_head_seq),
5695            visible_head_seq: runtime.visible_head_seq,
5696            durable_head_seq: runtime.durable_head_seq,
5697            current_seq: runtime.current_seq,
5698            snapshot_age_micros,
5699            startup_recovery_micros: self.startup_recovery_micros,
5700            startup_recovered_seq: self.startup_recovered_seq,
5701        }
5702    }
5703
5704    /// Returns an in-memory footprint estimate (bytes) for current keyspace state.
5705    ///
5706    /// This is an estimate based on row/KV/index payload sizes and is intended for
5707    /// policy and safety heuristics (for example, retention memory-pressure offload).
5708    pub async fn estimated_memory_bytes(&self) -> usize {
5709        let (snapshot, _, _) = self.executor.snapshot_state().await;
5710        snapshot.estimate_memory_bytes()
5711    }
5712
5713    pub async fn wait_for_durable(&self, seq: u64) -> Result<(), AedbError> {
5714        self.executor.wait_for_durable(seq).await
5715    }
5716
5717    pub async fn force_fsync(&self) -> Result<u64, AedbError> {
5718        self.executor.force_fsync().await
5719    }
5720
5721    pub async fn apply_migrations(&self, mut migrations: Vec<Migration>) -> Result<(), AedbError> {
5722        migrations.sort_by_key(|m| m.version);
5723        for migration in migrations {
5724            self.apply_migration(migration).await?;
5725        }
5726        Ok(())
5727    }
5728
5729    pub async fn run_migrations(
5730        &self,
5731        mut migrations: Vec<Migration>,
5732    ) -> Result<MigrationReport, AedbError> {
5733        migrations.sort_by_key(|m| m.version);
5734        if migrations.is_empty() {
5735            return Ok(MigrationReport {
5736                applied: Vec::new(),
5737                skipped: Vec::new(),
5738                current_version: 0,
5739            });
5740        }
5741        let project_id = migrations[0].project_id.clone();
5742        let scope_id = migrations[0].scope_id.clone();
5743        if migrations
5744            .iter()
5745            .any(|m| m.project_id != project_id || m.scope_id != scope_id)
5746        {
5747            return Err(AedbError::InvalidConfig {
5748                message: "all migrations in run_migrations must target the same project and scope"
5749                    .into(),
5750            });
5751        }
5752        let mut applied = Vec::new();
5753        let mut skipped = Vec::new();
5754        let existing = self
5755            .list_applied_migrations(&project_id, &scope_id)
5756            .await?
5757            .into_iter()
5758            .map(|record| (record.version, record))
5759            .collect::<HashMap<_, _>>();
5760        for migration in migrations {
5761            if let Some(record) = existing.get(&migration.version) {
5762                let checksum = checksum_hex(&migration)?;
5763                if checksum != record.checksum_hex {
5764                    return Err(AedbError::IntegrityError {
5765                        message: format!(
5766                            "migration checksum mismatch for version {}",
5767                            migration.version
5768                        ),
5769                    });
5770                }
5771                skipped.push(migration.version);
5772                continue;
5773            }
5774            let started = Instant::now();
5775            let version = migration.version;
5776            let name = migration.name.clone();
5777            self.apply_migration(migration).await?;
5778            applied.push((version, name, started.elapsed()));
5779        }
5780        let current_version = self.current_version(&project_id, &scope_id).await?;
5781        Ok(MigrationReport {
5782            applied,
5783            skipped,
5784            current_version,
5785        })
5786    }
5787
5788    pub async fn apply_migration(&self, migration: Migration) -> Result<(), AedbError> {
5789        const MAX_RETRIES_ON_CONFLICT: usize = 8;
5790
5791        let checksum = checksum_hex(&migration)?;
5792        let key = migration_key(migration.version);
5793        for attempt in 0..=MAX_RETRIES_ON_CONFLICT {
5794            let (snapshot, _, _) = self.executor.snapshot_state().await;
5795            if let Some(existing) =
5796                snapshot.kv_get(&migration.project_id, &migration.scope_id, &key)
5797            {
5798                let record = decode_record(&existing.value)?;
5799                if record.checksum_hex != checksum {
5800                    return Err(AedbError::IntegrityError {
5801                        message: format!(
5802                            "migration checksum mismatch for version {}",
5803                            migration.version
5804                        ),
5805                    });
5806                }
5807                return Ok(());
5808            }
5809
5810            let mut mutations = migration.mutations.clone();
5811            let predicted_applied_seq = self.executor.current_seq().await + 1;
5812            let record = MigrationRecord {
5813                version: migration.version,
5814                name: migration.name.clone(),
5815                project_id: migration.project_id.clone(),
5816                scope_id: migration.scope_id.clone(),
5817                applied_at_micros: std::time::SystemTime::now()
5818                    .duration_since(std::time::UNIX_EPOCH)
5819                    .unwrap_or_default()
5820                    .as_micros() as u64,
5821                applied_seq: predicted_applied_seq,
5822                checksum_hex: checksum.clone(),
5823            };
5824            mutations.push(Mutation::KvSet {
5825                project_id: migration.project_id.clone(),
5826                scope_id: migration.scope_id.clone(),
5827                key: key.clone(),
5828                value: encode_record(&record)?,
5829            });
5830            let base_seq = predicted_applied_seq.saturating_sub(1);
5831            match self
5832                .commit_envelope(TransactionEnvelope {
5833                    caller: None,
5834                    idempotency_key: None,
5835                    write_class: crate::commit::tx::WriteClass::Standard,
5836                    assertions: Vec::new(),
5837                    read_set: crate::commit::tx::ReadSet::default(),
5838                    write_intent: crate::commit::tx::WriteIntent { mutations },
5839                    base_seq,
5840                })
5841                .await
5842            {
5843                Ok(_) => return Ok(()),
5844                Err(AedbError::Conflict(_)) if attempt < MAX_RETRIES_ON_CONFLICT => continue,
5845                Err(err) => {
5846                    let (snapshot, _, _) = self.executor.snapshot_state().await;
5847                    if let Some(existing) =
5848                        snapshot.kv_get(&migration.project_id, &migration.scope_id, &key)
5849                    {
5850                        let record = decode_record(&existing.value)?;
5851                        if record.checksum_hex != checksum {
5852                            return Err(AedbError::IntegrityError {
5853                                message: format!(
5854                                    "migration checksum mismatch for version {}",
5855                                    migration.version
5856                                ),
5857                            });
5858                        }
5859                        return Ok(());
5860                    }
5861                    return Err(err);
5862                }
5863            }
5864        }
5865
5866        Err(AedbError::Conflict(format!(
5867            "migration {} could not be applied due to repeated conflicts",
5868            migration.version
5869        )))
5870    }
5871
5872    pub async fn list_applied_migrations(
5873        &self,
5874        project_id: &str,
5875        scope_id: &str,
5876    ) -> Result<Vec<MigrationRecord>, AedbError> {
5877        let (snapshot, _, _) = self.executor.snapshot_state().await;
5878        let ns = NamespaceId::project_scope(project_id, scope_id);
5879        let mut out = Vec::new();
5880        if let Some(namespace) = snapshot.namespaces.get(&ns) {
5881            for (k, v) in &namespace.kv.entries {
5882                if k.starts_with(b"__migrations/") {
5883                    out.push(decode_record(&v.value)?);
5884                }
5885            }
5886        }
5887        out.sort_by_key(|r| r.version);
5888        Ok(out)
5889    }
5890
5891    pub async fn applied_migrations(
5892        &self,
5893        project_id: &str,
5894        scope_id: &str,
5895    ) -> Result<Vec<MigrationRecord>, AedbError> {
5896        self.list_applied_migrations(project_id, scope_id).await
5897    }
5898
5899    pub async fn current_version(
5900        &self,
5901        project_id: &str,
5902        scope_id: &str,
5903    ) -> Result<u64, AedbError> {
5904        Ok(self
5905            .list_applied_migrations(project_id, scope_id)
5906            .await?
5907            .last()
5908            .map_or(0, |record| record.version))
5909    }
5910
5911    pub async fn rollback_to_migration(
5912        &self,
5913        project_id: &str,
5914        scope_id: &str,
5915        target_version_inclusive: u64,
5916        mut available_migrations: Vec<Migration>,
5917    ) -> Result<(), AedbError> {
5918        let applied = self.list_applied_migrations(project_id, scope_id).await?;
5919        available_migrations.sort_by_key(|m| m.version);
5920        for record in applied.into_iter().rev() {
5921            if record.version <= target_version_inclusive {
5922                break;
5923            }
5924            let migration = available_migrations
5925                .iter()
5926                .find(|m| m.version == record.version)
5927                .ok_or_else(|| {
5928                    AedbError::Validation(format!(
5929                        "rollback migration definition missing for version {}",
5930                        record.version
5931                    ))
5932                })?;
5933            let Some(down) = &migration.down_mutations else {
5934                return Err(AedbError::Validation(format!(
5935                    "migration {} has no down_mutations",
5936                    record.version
5937                )));
5938            };
5939            let mut mutations = down.clone();
5940            mutations.push(Mutation::KvDel {
5941                project_id: project_id.to_string(),
5942                scope_id: scope_id.to_string(),
5943                key: migration_key(record.version),
5944            });
5945            let base_seq = self.executor.current_seq().await;
5946            self.commit_envelope(TransactionEnvelope {
5947                caller: None,
5948                idempotency_key: None,
5949                write_class: crate::commit::tx::WriteClass::Standard,
5950                assertions: Vec::new(),
5951                read_set: crate::commit::tx::ReadSet::default(),
5952                write_intent: crate::commit::tx::WriteIntent { mutations },
5953                base_seq,
5954            })
5955            .await?;
5956        }
5957        Ok(())
5958    }
5959
5960    pub async fn backfill_table(
5961        &self,
5962        project_id: &str,
5963        scope_id: &str,
5964        table_name: &str,
5965        update: fn(&crate::catalog::types::Row) -> Option<crate::catalog::types::Row>,
5966    ) -> Result<u64, AedbError> {
5967        self.backfill_table_batched(project_id, scope_id, table_name, usize::MAX, update)
5968            .await
5969    }
5970
5971    pub async fn backfill_table_batched(
5972        &self,
5973        project_id: &str,
5974        scope_id: &str,
5975        table_name: &str,
5976        batch_size: usize,
5977        update: fn(&crate::catalog::types::Row) -> Option<crate::catalog::types::Row>,
5978    ) -> Result<u64, AedbError> {
5979        let batch_size = batch_size.max(1);
5980        let (snapshot, catalog, _) = self.executor.snapshot_state().await;
5981        let schema = catalog
5982            .tables
5983            .get(&(namespace_key(project_id, scope_id), table_name.to_string()))
5984            .ok_or_else(|| AedbError::Validation("table not found".into()))?
5985            .clone();
5986        let table = snapshot
5987            .table(project_id, scope_id, table_name)
5988            .ok_or_else(|| AedbError::Validation("table not found".into()))?;
5989        let rows: Vec<crate::catalog::types::Row> = table.rows.values().cloned().collect();
5990        let mut updated = 0u64;
5991        for chunk in rows.chunks(batch_size) {
5992            for row in chunk {
5993                if let Some(new_row) = update(row) {
5994                    let primary_key = schema
5995                        .primary_key
5996                        .iter()
5997                        .map(|pk_name| {
5998                            let column_index = schema
5999                                .columns
6000                                .iter()
6001                                .position(|c| c.name == *pk_name)
6002                                .ok_or_else(|| {
6003                                    AedbError::Validation(format!(
6004                                        "primary key column missing: {pk_name}"
6005                                    ))
6006                                })?;
6007                            Ok(new_row.values[column_index].clone())
6008                        })
6009                        .collect::<Result<Vec<_>, AedbError>>()?;
6010                    self.commit(Mutation::Upsert {
6011                        project_id: project_id.to_string(),
6012                        scope_id: scope_id.to_string(),
6013                        table_name: table_name.to_string(),
6014                        primary_key,
6015                        row: new_row,
6016                    })
6017                    .await?;
6018                    updated += 1;
6019                }
6020            }
6021        }
6022        Ok(updated)
6023    }
6024
6025    pub async fn backfill_table_batched_resumable(
6026        &self,
6027        project_id: &str,
6028        scope_id: &str,
6029        table_name: &str,
6030        batch_size: usize,
6031        progress_key: &[u8],
6032        update: fn(&crate::catalog::types::Row) -> Option<crate::catalog::types::Row>,
6033    ) -> Result<u64, AedbError> {
6034        let batch_size = batch_size.max(1);
6035        let (snapshot, catalog, _) = self.executor.snapshot_state().await;
6036        let schema = catalog
6037            .tables
6038            .get(&(namespace_key(project_id, scope_id), table_name.to_string()))
6039            .ok_or_else(|| AedbError::Validation("table not found".into()))?
6040            .clone();
6041        let table = snapshot
6042            .table(project_id, scope_id, table_name)
6043            .ok_or_else(|| AedbError::Validation("table not found".into()))?;
6044        let rows: Vec<crate::catalog::types::Row> = table.rows.values().cloned().collect();
6045        let start_offset = snapshot
6046            .kv_get(project_id, scope_id, progress_key)
6047            .and_then(|e| std::str::from_utf8(&e.value).ok()?.parse::<usize>().ok())
6048            .unwrap_or(0);
6049        let mut updated = 0u64;
6050        let start_offset = start_offset.min(rows.len());
6051        for (chunk_index, chunk) in rows[start_offset..].chunks(batch_size).enumerate() {
6052            for row in chunk {
6053                if let Some(new_row) = update(row) {
6054                    let primary_key = schema
6055                        .primary_key
6056                        .iter()
6057                        .map(|pk_name| {
6058                            let column_index = schema
6059                                .columns
6060                                .iter()
6061                                .position(|c| c.name == *pk_name)
6062                                .ok_or_else(|| {
6063                                    AedbError::Validation(format!(
6064                                        "primary key column missing: {pk_name}"
6065                                    ))
6066                                })?;
6067                            Ok(new_row.values[column_index].clone())
6068                        })
6069                        .collect::<Result<Vec<_>, AedbError>>()?;
6070                    self.commit(Mutation::Upsert {
6071                        project_id: project_id.to_string(),
6072                        scope_id: scope_id.to_string(),
6073                        table_name: table_name.to_string(),
6074                        primary_key,
6075                        row: new_row,
6076                    })
6077                    .await?;
6078                    updated += 1;
6079                }
6080            }
6081            let progressed =
6082                start_offset + ((chunk_index + 1) * batch_size).min(rows.len() - start_offset);
6083            self.commit(Mutation::KvSet {
6084                project_id: project_id.to_string(),
6085                scope_id: scope_id.to_string(),
6086                key: progress_key.to_vec(),
6087                value: progressed.to_string().into_bytes(),
6088            })
6089            .await?;
6090        }
6091        Ok(updated)
6092    }
6093
6094    pub async fn head_state(&self) -> crate::commit::executor::HeadState {
6095        self.executor.head_state().await
6096    }
6097}
6098
6099impl ReadTx<'_> {
6100    pub fn snapshot_seq(&self) -> u64 {
6101        self.lease.view.seq
6102    }
6103
6104    pub async fn query(
6105        &self,
6106        project_id: &str,
6107        scope_id: &str,
6108        query: Query,
6109    ) -> Result<QueryResult, QueryError> {
6110        self.query_with_options(project_id, scope_id, query, QueryOptions::default())
6111            .await
6112    }
6113
6114    pub async fn query_with_options(
6115        &self,
6116        project_id: &str,
6117        scope_id: &str,
6118        query: Query,
6119        mut options: QueryOptions,
6120    ) -> Result<QueryResult, QueryError> {
6121        options.consistency = ConsistencyMode::AtSeq(self.lease.view.seq);
6122        let started = Instant::now();
6123        let table = query.table.clone();
6124        let result = execute_query_against_view(
6125            &self.lease.view,
6126            project_id,
6127            scope_id,
6128            query,
6129            &options,
6130            self.caller.as_ref(),
6131            self.db._config.max_scan_rows,
6132        );
6133        self.db.emit_query_telemetry(
6134            started,
6135            project_id,
6136            scope_id,
6137            &table,
6138            self.lease.view.seq,
6139            &result,
6140        );
6141        result
6142    }
6143
6144    pub async fn query_batch(
6145        &self,
6146        project_id: &str,
6147        scope_id: &str,
6148        items: Vec<QueryBatchItem>,
6149    ) -> Result<Vec<QueryResult>, QueryError> {
6150        if items.is_empty() {
6151            return Ok(Vec::new());
6152        }
6153        let mut out = Vec::with_capacity(items.len());
6154        for item in items {
6155            out.push(
6156                self.query_with_options(project_id, scope_id, item.query, item.options)
6157                    .await?,
6158            );
6159        }
6160        Ok(out)
6161    }
6162
6163    pub async fn exists(
6164        &self,
6165        project_id: &str,
6166        scope_id: &str,
6167        mut query: Query,
6168    ) -> Result<bool, QueryError> {
6169        query.limit = Some(1);
6170        let result = self.query(project_id, scope_id, query).await?;
6171        Ok(!result.rows.is_empty())
6172    }
6173
6174    pub fn explain(
6175        &self,
6176        project_id: &str,
6177        scope_id: &str,
6178        query: Query,
6179        mut options: QueryOptions,
6180    ) -> Result<QueryDiagnostics, QueryError> {
6181        options.consistency = ConsistencyMode::AtSeq(self.lease.view.seq);
6182        explain_query_against_view(
6183            &self.lease.view,
6184            project_id,
6185            scope_id,
6186            query,
6187            &options,
6188            self.caller.as_ref(),
6189            self.db._config.max_scan_rows,
6190        )
6191    }
6192
6193    pub async fn query_with_diagnostics(
6194        &self,
6195        project_id: &str,
6196        scope_id: &str,
6197        query: Query,
6198        options: QueryOptions,
6199    ) -> Result<QueryWithDiagnosticsResult, QueryError> {
6200        let diagnostics = self.explain(project_id, scope_id, query.clone(), options.clone())?;
6201        let result = self
6202            .query_with_options(project_id, scope_id, query, options)
6203            .await?;
6204        Ok(QueryWithDiagnosticsResult {
6205            result,
6206            diagnostics,
6207        })
6208    }
6209
6210    pub async fn query_page_stable(
6211        &self,
6212        project_id: &str,
6213        scope_id: &str,
6214        query: Query,
6215        cursor: Option<String>,
6216        page_size: usize,
6217    ) -> Result<QueryResult, QueryError> {
6218        let query =
6219            ensure_stable_order_from_catalog(project_id, scope_id, &self.lease.view.catalog, query);
6220        self.query_with_options(
6221            project_id,
6222            scope_id,
6223            query.limit(page_size.max(1)),
6224            QueryOptions {
6225                consistency: ConsistencyMode::AtSeq(self.lease.view.seq),
6226                cursor,
6227                ..QueryOptions::default()
6228            },
6229        )
6230        .await
6231    }
6232
6233    pub async fn list_with_total(
6234        &self,
6235        project_id: &str,
6236        scope_id: &str,
6237        mut query: Query,
6238        cursor: Option<String>,
6239        offset: Option<usize>,
6240        page_size: usize,
6241    ) -> Result<ListPageResult, QueryError> {
6242        if !query.aggregates.is_empty() || !query.group_by.is_empty() || query.having.is_some() {
6243            return Err(QueryError::InvalidQuery {
6244                reason: "list_with_total expects non-aggregate query".into(),
6245            });
6246        }
6247        query =
6248            ensure_stable_order_from_catalog(project_id, scope_id, &self.lease.view.catalog, query);
6249        let count_query = Query {
6250            select: vec!["count_star".into()],
6251            table: query.table.clone(),
6252            table_alias: query.table_alias.clone(),
6253            joins: query.joins.clone(),
6254            predicate: query.predicate.clone(),
6255            order_by: Vec::new(),
6256            limit: None,
6257            group_by: Vec::new(),
6258            aggregates: vec![crate::query::plan::Aggregate::Count],
6259            having: None,
6260            use_index: query.use_index.clone(),
6261        };
6262        let count_result = self
6263            .query_with_options(
6264                project_id,
6265                scope_id,
6266                count_query,
6267                QueryOptions {
6268                    allow_full_scan: true,
6269                    ..QueryOptions::default()
6270                },
6271            )
6272            .await?;
6273        let total_count = count_result
6274            .rows
6275            .first()
6276            .and_then(|row| row.values.first())
6277            .and_then(|value| match value {
6278                Value::Integer(v) => usize::try_from(*v).ok(),
6279                _ => None,
6280            })
6281            .unwrap_or(0);
6282
6283        if let Some(offset) = offset {
6284            let mut full_query = query.clone();
6285            full_query.limit = None;
6286            let full = self
6287                .query_with_options(
6288                    project_id,
6289                    scope_id,
6290                    full_query,
6291                    QueryOptions {
6292                        allow_full_scan: true,
6293                        ..QueryOptions::default()
6294                    },
6295                )
6296                .await?;
6297            let rows = full
6298                .rows
6299                .into_iter()
6300                .skip(offset)
6301                .take(page_size.max(1))
6302                .collect::<Vec<_>>();
6303            return Ok(ListPageResult {
6304                rows,
6305                total_count,
6306                next_cursor: None,
6307                snapshot_seq: full.snapshot_seq,
6308                rows_examined: full.rows_examined,
6309            });
6310        }
6311
6312        let page = self
6313            .query_page_stable(project_id, scope_id, query, cursor, page_size)
6314            .await?;
6315        Ok(ListPageResult {
6316            rows: page.rows,
6317            total_count,
6318            next_cursor: page.cursor,
6319            snapshot_seq: page.snapshot_seq,
6320            rows_examined: page.rows_examined,
6321        })
6322    }
6323
6324    pub async fn lookup_then_hydrate(
6325        &self,
6326        project_id: &str,
6327        scope_id: &str,
6328        source_query: Query,
6329        source_key_index: usize,
6330        hydrate_query: Query,
6331        hydrate_key_column: &str,
6332    ) -> Result<(QueryResult, QueryResult), QueryError> {
6333        let mut source = self.query(project_id, scope_id, source_query).await?;
6334        // This helper treats the source query's limit as the caller's complete key set.
6335        // Do not surface pagination state from the underlying query engine here.
6336        source.cursor = None;
6337        source.truncated = false;
6338        let keys = source
6339            .rows
6340            .iter()
6341            .filter_map(|row| row.values.get(source_key_index).cloned())
6342            .collect::<Vec<_>>();
6343        if keys.is_empty() {
6344            return Ok((
6345                source,
6346                QueryResult {
6347                    rows: Vec::new(),
6348                    rows_examined: 0,
6349                    cursor: None,
6350                    truncated: false,
6351                    snapshot_seq: self.lease.view.seq,
6352                    materialized_seq: None,
6353                },
6354            ));
6355        }
6356        let predicate = Expr::In(hydrate_key_column.to_string(), keys);
6357        let hydrate_query = Query {
6358            predicate: Some(match hydrate_query.predicate {
6359                Some(existing) => Expr::And(Box::new(existing), Box::new(predicate)),
6360                None => predicate,
6361            }),
6362            ..hydrate_query
6363        };
6364        let page_size = self.db._config.max_scan_rows.min(100).max(1);
6365        let mut hydrate_query = ensure_stable_order_from_catalog(
6366            project_id,
6367            scope_id,
6368            &self.lease.view.catalog,
6369            hydrate_query,
6370        );
6371        hydrate_query.limit = Some(page_size);
6372
6373        let mut all_rows = Vec::new();
6374        let mut total_rows_examined = 0usize;
6375        let mut next_cursor: Option<String> = None;
6376        let mut materialized_seq = None;
6377        loop {
6378            let page = self
6379                .query_with_options(
6380                    project_id,
6381                    scope_id,
6382                    hydrate_query.clone(),
6383                    QueryOptions {
6384                        consistency: ConsistencyMode::AtSeq(self.lease.view.seq),
6385                        cursor: next_cursor.clone(),
6386                        ..QueryOptions::default()
6387                    },
6388                )
6389                .await?;
6390            total_rows_examined = total_rows_examined.saturating_add(page.rows_examined);
6391            if materialized_seq.is_none() {
6392                materialized_seq = page.materialized_seq;
6393            }
6394            all_rows.extend(page.rows);
6395            if let Some(cursor) = page.cursor {
6396                next_cursor = Some(cursor);
6397                continue;
6398            }
6399            break;
6400        }
6401        let hydrated = QueryResult {
6402            rows: all_rows,
6403            rows_examined: total_rows_examined,
6404            cursor: None,
6405            truncated: false,
6406            snapshot_seq: self.lease.view.seq,
6407            materialized_seq,
6408        };
6409        Ok((source, hydrated))
6410    }
6411}