Skip to main content

sqlrite/
server.rs

1use crate::security::{AccessPolicy, AuditLogger};
2use crate::{
3    AccessContext, AccessOperation, AuditEvent, AuditExportFormat, AuditQuery, DurabilityProfile,
4    FailoverMode, HaRuntimeProfile, HaRuntimeState, JsonlAuditLogger, QueryProfile, RbacPolicy,
5    ReplicationLog, ReplicationLogEntry, Result, RuntimeConfig, ServerRole, SqlRite, SqlRiteError,
6    build_health_report, create_backup_snapshot, execute_sdk_query, execute_sdk_sql,
7    export_audit_events, list_backup_snapshots, prune_backup_snapshots,
8    restore_backup_file_verified, select_backup_snapshot_for_time,
9};
10use rusqlite::{Connection, OptionalExtension, params};
11use serde::Deserialize;
12use serde_json::{Value, json};
13use sqlrite_sdk_core::{QueryRequest as QueryApiRequest, SqlRequest as SqlApiRequest};
14use std::collections::{HashMap, VecDeque};
15use std::io::{BufRead, BufReader, Read, Write};
16use std::net::{TcpListener, TcpStream};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::{SystemTime, UNIX_EPOCH};
20
21#[derive(Debug, Clone)]
22pub struct ServerConfig {
23    pub bind_addr: String,
24    pub ha_profile: HaRuntimeProfile,
25    pub control_api_token: Option<String>,
26    pub enable_sql_endpoint: bool,
27    pub security: ServerSecurityConfig,
28}
29
30#[derive(Debug, Clone)]
31pub struct ServerSecurityConfig {
32    pub secure_defaults: bool,
33    pub require_auth_context: bool,
34    pub policy: Option<RbacPolicy>,
35    pub audit_log_path: Option<PathBuf>,
36    pub audit_redacted_fields: Vec<String>,
37}
38
39impl Default for ServerSecurityConfig {
40    fn default() -> Self {
41        Self {
42            secure_defaults: false,
43            require_auth_context: false,
44            policy: None,
45            audit_log_path: None,
46            audit_redacted_fields: vec![
47                "statement".to_string(),
48                "query_embedding".to_string(),
49                "metadata_filters".to_string(),
50                "auth_token".to_string(),
51            ],
52        }
53    }
54}
55
56impl ServerSecurityConfig {
57    fn enabled(&self) -> bool {
58        self.secure_defaults
59            || self.require_auth_context
60            || self.policy.is_some()
61            || self.audit_log_path.is_some()
62    }
63}
64
65impl Default for ServerConfig {
66    fn default() -> Self {
67        Self {
68            bind_addr: "127.0.0.1:8099".to_string(),
69            ha_profile: HaRuntimeProfile::default(),
70            control_api_token: None,
71            enable_sql_endpoint: true,
72            security: ServerSecurityConfig::default(),
73        }
74    }
75}
76
77#[derive(Debug, Clone)]
78struct ControlPlaneState {
79    profile: HaRuntimeProfile,
80    runtime: HaRuntimeState,
81    replication_log: ReplicationLog,
82    replica_progress: HashMap<String, u64>,
83    resilience: ResilienceState,
84    chaos: ChaosHarnessState,
85    observability: ObservabilityState,
86}
87
88impl ControlPlaneState {
89    fn new(profile: HaRuntimeProfile) -> Self {
90        let runtime = HaRuntimeState::new(&profile);
91        Self {
92            profile,
93            runtime,
94            replication_log: ReplicationLog::new(),
95            replica_progress: HashMap::new(),
96            resilience: ResilienceState::default(),
97            chaos: ChaosHarnessState::default(),
98            observability: ObservabilityState::default(),
99        }
100    }
101
102    fn snapshot_json(&self) -> Value {
103        json!({
104            "profile": self.profile,
105            "state": self.runtime,
106            "replication": {
107                "log_len": self.replication_log.len(),
108                "last_log_index": self.replication_log.last_index(),
109                "last_log_term": self.replication_log.last_term(),
110                "replica_progress": self.replica_progress,
111            },
112            "resilience": self.resilience.to_json(self.chaos.active_count()),
113            "chaos": self.chaos.to_json(),
114            "observability": self.observability.to_json(),
115        })
116    }
117}
118
119#[derive(Debug, Clone, serde::Serialize)]
120struct QueryTraceRecord {
121    seq: u64,
122    timestamp_unix_ms: u64,
123    method: String,
124    path: String,
125    status: u16,
126    duration_ms: u64,
127}
128
129#[derive(Debug, Clone)]
130struct ObservabilityState {
131    request_seq: u64,
132    requests_total: u64,
133    requests_server_errors_total: u64,
134    requests_client_errors_total: u64,
135    sql_requests_total: u64,
136    sql_requests_failed_total: u64,
137    sql_latency_total_ms: u128,
138    sql_latency_max_ms: u64,
139    alert_simulations_total: u64,
140    traces: VecDeque<QueryTraceRecord>,
141    trace_capacity: usize,
142}
143
144impl Default for ObservabilityState {
145    fn default() -> Self {
146        Self {
147            request_seq: 0,
148            requests_total: 0,
149            requests_server_errors_total: 0,
150            requests_client_errors_total: 0,
151            sql_requests_total: 0,
152            sql_requests_failed_total: 0,
153            sql_latency_total_ms: 0,
154            sql_latency_max_ms: 0,
155            alert_simulations_total: 0,
156            traces: VecDeque::new(),
157            trace_capacity: 512,
158        }
159    }
160}
161
162impl ObservabilityState {
163    fn record_request(&mut self, method: &str, path: &str, status: u16, duration_ms: u64) {
164        self.request_seq = self.request_seq.saturating_add(1);
165        self.requests_total = self.requests_total.saturating_add(1);
166        if status >= 500 {
167            self.requests_server_errors_total = self.requests_server_errors_total.saturating_add(1);
168        } else if status >= 400 {
169            self.requests_client_errors_total = self.requests_client_errors_total.saturating_add(1);
170        }
171
172        let (raw_path, _) = split_path_and_query(path);
173        if matches!(
174            raw_path,
175            "/v1/sql"
176                | "/v1/query"
177                | "/v1/query-compact"
178                | "/grpc/sqlrite.v1.QueryService/Query"
179                | "/grpc/sqlrite.v1.QueryService/Sql"
180        ) {
181            self.sql_requests_total = self.sql_requests_total.saturating_add(1);
182            self.sql_latency_total_ms = self
183                .sql_latency_total_ms
184                .saturating_add(duration_ms as u128);
185            self.sql_latency_max_ms = self.sql_latency_max_ms.max(duration_ms);
186            if status >= 400 {
187                self.sql_requests_failed_total = self.sql_requests_failed_total.saturating_add(1);
188            }
189        }
190
191        self.traces.push_back(QueryTraceRecord {
192            seq: self.request_seq,
193            timestamp_unix_ms: unix_ms_now(),
194            method: method.to_string(),
195            path: raw_path.to_string(),
196            status,
197            duration_ms,
198        });
199        while self.traces.len() > self.trace_capacity {
200            let _ = self.traces.pop_front();
201        }
202    }
203
204    fn sql_avg_latency_ms(&self) -> f64 {
205        if self.sql_requests_total == 0 {
206            return 0.0;
207        }
208        self.sql_latency_total_ms as f64 / self.sql_requests_total as f64
209    }
210
211    fn recent_traces(&self, limit: usize) -> Vec<QueryTraceRecord> {
212        let take = limit.max(1).min(self.trace_capacity);
213        self.traces.iter().rev().take(take).cloned().collect()
214    }
215
216    fn to_json(&self) -> Value {
217        json!({
218            "requests_total": self.requests_total,
219            "requests_server_errors_total": self.requests_server_errors_total,
220            "requests_client_errors_total": self.requests_client_errors_total,
221            "sql_requests_total": self.sql_requests_total,
222            "sql_requests_failed_total": self.sql_requests_failed_total,
223            "sql_avg_latency_ms": self.sql_avg_latency_ms(),
224            "sql_latency_max_ms": self.sql_latency_max_ms,
225            "alert_simulations_total": self.alert_simulations_total,
226            "trace_buffered": self.traces.len(),
227            "trace_capacity": self.trace_capacity,
228        })
229    }
230
231    fn reset(&mut self) {
232        self.requests_total = 0;
233        self.requests_server_errors_total = 0;
234        self.requests_client_errors_total = 0;
235        self.sql_requests_total = 0;
236        self.sql_requests_failed_total = 0;
237        self.sql_latency_total_ms = 0;
238        self.sql_latency_max_ms = 0;
239        self.traces.clear();
240    }
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, serde::Serialize)]
244#[serde(rename_all = "snake_case")]
245enum ChaosScenario {
246    NodeCrash,
247    DiskFull,
248    PartitionSubset,
249}
250
251impl ChaosScenario {
252    fn as_str(self) -> &'static str {
253        match self {
254            Self::NodeCrash => "node_crash",
255            Self::DiskFull => "disk_full",
256            Self::PartitionSubset => "partition_subset",
257        }
258    }
259}
260
261#[derive(Debug, Clone, Default, serde::Serialize)]
262struct ChaosFaultState {
263    scenario: String,
264    injected_at_unix_ms: u64,
265    duration_ms: Option<u64>,
266    note: Option<String>,
267    blocked_nodes: Vec<String>,
268}
269
270#[derive(Debug, Clone, Default)]
271struct ChaosHarnessState {
272    active_faults: HashMap<ChaosScenario, ChaosFaultState>,
273}
274
275impl ChaosHarnessState {
276    fn inject(&mut self, request: ChaosInjectRequest) {
277        let scenario = request.scenario;
278        self.active_faults.insert(
279            scenario,
280            ChaosFaultState {
281                scenario: scenario.as_str().to_string(),
282                injected_at_unix_ms: unix_ms_now(),
283                duration_ms: request.duration_ms,
284                note: request.note,
285                blocked_nodes: request.blocked_nodes.unwrap_or_default(),
286            },
287        );
288    }
289
290    fn clear(&mut self, scenario: Option<ChaosScenario>) {
291        if let Some(value) = scenario {
292            self.active_faults.remove(&value);
293        } else {
294            self.active_faults.clear();
295        }
296    }
297
298    fn cleanup_expired(&mut self, now_ms: u64) {
299        self.active_faults.retain(|_, fault| {
300            let Some(duration_ms) = fault.duration_ms else {
301                return true;
302            };
303            now_ms.saturating_sub(fault.injected_at_unix_ms) < duration_ms
304        });
305    }
306
307    fn has(&self, scenario: ChaosScenario) -> bool {
308        self.active_faults.contains_key(&scenario)
309    }
310
311    fn active_count(&self) -> usize {
312        self.active_faults.len()
313    }
314
315    fn to_json(&self) -> Value {
316        let faults = self
317            .active_faults
318            .values()
319            .cloned()
320            .map(|fault| serde_json::to_value(fault).unwrap_or(Value::Null))
321            .collect::<Vec<_>>();
322        json!({
323            "active_fault_count": faults.len(),
324            "faults": faults,
325        })
326    }
327}
328
329#[derive(Debug, Clone, Default)]
330struct ResilienceState {
331    failover_events_total: u64,
332    failover_completed_total: u64,
333    active_failover_started_unix_ms: Option<u64>,
334    last_failover_duration_ms: Option<u64>,
335    cumulative_failover_duration_ms: u128,
336    restore_events_total: u64,
337    restore_completed_total: u64,
338    active_restore_started_unix_ms: Option<u64>,
339    last_restore_duration_ms: Option<u64>,
340    cumulative_restore_duration_ms: u128,
341    chaos_injections_total: u64,
342    chaos_blocked_requests_total: u64,
343}
344
345impl ResilienceState {
346    fn start_failover(&mut self) {
347        if self.active_failover_started_unix_ms.is_none() {
348            self.failover_events_total = self.failover_events_total.saturating_add(1);
349            self.active_failover_started_unix_ms = Some(unix_ms_now());
350        }
351    }
352
353    fn complete_failover(&mut self) -> Option<u64> {
354        let start = self.active_failover_started_unix_ms.take()?;
355        let duration = unix_ms_now().saturating_sub(start);
356        self.failover_completed_total = self.failover_completed_total.saturating_add(1);
357        self.last_failover_duration_ms = Some(duration);
358        self.cumulative_failover_duration_ms = self
359            .cumulative_failover_duration_ms
360            .saturating_add(duration as u128);
361        Some(duration)
362    }
363
364    fn start_restore(&mut self) {
365        if self.active_restore_started_unix_ms.is_none() {
366            self.restore_events_total = self.restore_events_total.saturating_add(1);
367            self.active_restore_started_unix_ms = Some(unix_ms_now());
368        }
369    }
370
371    fn complete_restore(&mut self) -> Option<u64> {
372        let start = self.active_restore_started_unix_ms.take()?;
373        let duration = unix_ms_now().saturating_sub(start);
374        self.restore_completed_total = self.restore_completed_total.saturating_add(1);
375        self.last_restore_duration_ms = Some(duration);
376        self.cumulative_restore_duration_ms = self
377            .cumulative_restore_duration_ms
378            .saturating_add(duration as u128);
379        Some(duration)
380    }
381
382    fn avg_failover_duration_ms(&self) -> f64 {
383        if self.failover_completed_total == 0 {
384            return 0.0;
385        }
386        self.cumulative_failover_duration_ms as f64 / self.failover_completed_total as f64
387    }
388
389    fn avg_restore_duration_ms(&self) -> f64 {
390        if self.restore_completed_total == 0 {
391            return 0.0;
392        }
393        self.cumulative_restore_duration_ms as f64 / self.restore_completed_total as f64
394    }
395
396    fn to_json(&self, active_chaos_faults: usize) -> Value {
397        json!({
398            "failover_events_total": self.failover_events_total,
399            "failover_completed_total": self.failover_completed_total,
400            "active_failover_started_unix_ms": self.active_failover_started_unix_ms,
401            "last_failover_duration_ms": self.last_failover_duration_ms,
402            "avg_failover_duration_ms": self.avg_failover_duration_ms(),
403            "restore_events_total": self.restore_events_total,
404            "restore_completed_total": self.restore_completed_total,
405            "active_restore_started_unix_ms": self.active_restore_started_unix_ms,
406            "last_restore_duration_ms": self.last_restore_duration_ms,
407            "avg_restore_duration_ms": self.avg_restore_duration_ms(),
408            "chaos_injections_total": self.chaos_injections_total,
409            "chaos_blocked_requests_total": self.chaos_blocked_requests_total,
410            "chaos_active_faults": active_chaos_faults,
411        })
412    }
413}
414
415#[derive(Debug)]
416struct HttpRequest {
417    method: String,
418    path: String,
419    headers: HashMap<String, String>,
420    body: Vec<u8>,
421}
422
423#[derive(Debug, Deserialize, Default)]
424struct PromoteRequest {
425    leader_id: Option<String>,
426}
427
428#[derive(Debug, Deserialize, Default)]
429struct StepDownRequest {
430    leader_id: Option<String>,
431}
432
433#[derive(Debug, Deserialize, Default)]
434struct RecoveryRequest {
435    backup_artifact: Option<String>,
436    note: Option<String>,
437}
438
439#[derive(Debug, Deserialize, Default)]
440struct RecoveryStartRequest {
441    note: Option<String>,
442}
443
444#[derive(Debug, Deserialize, Default)]
445struct RecoverySnapshotRequest {
446    note: Option<String>,
447}
448
449#[derive(Debug, Deserialize, Default)]
450struct RecoveryVerifyRestoreRequest {
451    snapshot_path: Option<String>,
452    target_unix_ms: Option<u64>,
453    note: Option<String>,
454    #[serde(default)]
455    keep_artifact: bool,
456}
457
458#[derive(Debug, Deserialize, Default)]
459struct RecoveryPruneRequest {
460    retention_seconds: Option<u64>,
461}
462
463#[derive(Debug, Deserialize)]
464struct ReplicationAppendRequest {
465    operation: Option<String>,
466    #[serde(default)]
467    payload: Value,
468}
469
470#[derive(Debug, Deserialize)]
471struct ReplicationReceiveRequest {
472    term: u64,
473    leader_id: String,
474    prev_log_index: u64,
475    prev_log_term: u64,
476    #[serde(default)]
477    entries: Vec<ReplicationLogEntry>,
478    leader_commit: Option<u64>,
479}
480
481#[derive(Debug, Deserialize)]
482struct ReplicationAckRequest {
483    node_id: String,
484    index: u64,
485}
486
487#[derive(Debug, Deserialize, Default)]
488struct ReplicationReconcileRequest {
489    node_id: Option<String>,
490    last_applied_index: Option<u64>,
491    commit_index: Option<u64>,
492    replication_lag_ms: Option<u64>,
493}
494
495#[derive(Debug, Deserialize)]
496struct ElectionVoteRequest {
497    term: u64,
498    candidate_id: String,
499    candidate_last_log_index: u64,
500    candidate_last_log_term: u64,
501}
502
503#[derive(Debug, Deserialize)]
504struct ElectionHeartbeatRequest {
505    term: u64,
506    leader_id: String,
507    commit_index: u64,
508    leader_last_log_index: Option<u64>,
509    replication_lag_ms: Option<u64>,
510}
511
512#[derive(Debug, Deserialize, Default)]
513struct AutoFailoverCheckRequest {
514    #[serde(default)]
515    force: bool,
516    simulate_elapsed_ms: Option<u64>,
517    reason: Option<String>,
518}
519
520#[derive(Debug, Deserialize)]
521struct ChaosInjectRequest {
522    scenario: ChaosScenario,
523    duration_ms: Option<u64>,
524    note: Option<String>,
525    blocked_nodes: Option<Vec<String>>,
526}
527
528#[derive(Debug, Deserialize, Default)]
529struct ChaosClearRequest {
530    scenario: Option<ChaosScenario>,
531}
532
533#[derive(Debug, Deserialize, Default)]
534struct AlertSimulationRequest {
535    sql_error_rate: Option<f64>,
536    sql_avg_latency_ms: Option<f64>,
537    replication_lag_ms: Option<u64>,
538    restore_active_ms: Option<u64>,
539}
540
541#[derive(Debug, Deserialize, Default)]
542struct SecurityAuditExportRequest {
543    actor_id: Option<String>,
544    tenant_id: Option<String>,
545    operation: Option<AccessOperation>,
546    allowed: Option<bool>,
547    from_unix_ms: Option<u64>,
548    to_unix_ms: Option<u64>,
549    limit: Option<usize>,
550    output_path: Option<String>,
551    format: Option<String>,
552}
553
554#[derive(Debug, Deserialize, Default)]
555struct RerankHookRequest {
556    query_text: Option<String>,
557    query_embedding: Option<Vec<f32>>,
558    candidate_count: Option<usize>,
559    alpha: Option<f32>,
560    candidate_limit: Option<usize>,
561    query_profile: Option<String>,
562    metadata_filters: Option<HashMap<String, String>>,
563    doc_id: Option<String>,
564}
565
566pub fn serve_health_endpoints(
567    db_path: impl AsRef<Path>,
568    runtime: RuntimeConfig,
569    config: ServerConfig,
570) -> Result<()> {
571    config
572        .ha_profile
573        .validate()
574        .map_err(std::io::Error::other)?;
575
576    let db_path = db_path.as_ref().to_path_buf();
577    let db = SqlRite::open_with_config(&db_path, runtime.clone())?;
578    let listener = TcpListener::bind(&config.bind_addr)?;
579
580    let mut control = ControlPlaneState::new(config.ha_profile.clone());
581    restore_replication_state(&db_path, &mut control)?;
582    let state = Arc::new(Mutex::new(control));
583
584    for stream in listener.incoming() {
585        let mut stream = match stream {
586            Ok(stream) => stream,
587            Err(_) => continue,
588        };
589
590        if let Err(error) = handle_connection(
591            &db,
592            &db_path,
593            runtime.durability_profile,
594            &config,
595            &state,
596            &mut stream,
597        ) {
598            let _ = write_response(
599                &mut stream,
600                500,
601                "text/plain; charset=utf-8",
602                &format!("internal error: {error}"),
603            );
604        }
605    }
606
607    Ok(())
608}
609
610fn handle_connection(
611    db: &SqlRite,
612    db_path: &Path,
613    sql_profile: DurabilityProfile,
614    config: &ServerConfig,
615    state: &Arc<Mutex<ControlPlaneState>>,
616    stream: &mut TcpStream,
617) -> Result<()> {
618    let started_unix_ms = unix_ms_now();
619    let request = read_http_request(stream)?;
620    let (status, content_type, body) =
621        build_response(db, db_path, sql_profile, config, state, &request)?;
622    write_response(stream, status, content_type, &body)?;
623    if let Ok(mut control) = state.lock() {
624        let duration_ms = unix_ms_now().saturating_sub(started_unix_ms);
625        control
626            .observability
627            .record_request(&request.method, &request.path, status, duration_ms);
628    }
629    Ok(())
630}
631
632fn read_http_request(stream: &TcpStream) -> Result<HttpRequest> {
633    let mut reader = BufReader::new(stream.try_clone()?);
634    let mut first_line = String::new();
635    reader.read_line(&mut first_line)?;
636
637    let (method, path) = parse_http_request_line(&first_line)
638        .ok_or_else(|| std::io::Error::other("invalid HTTP request line"))?;
639
640    let mut headers = HashMap::new();
641    loop {
642        let mut line = String::new();
643        let read = reader.read_line(&mut line)?;
644        if read == 0 || line == "\r\n" || line == "\n" {
645            break;
646        }
647        if let Some((name, value)) = line.split_once(':') {
648            headers.insert(
649                name.trim().to_ascii_lowercase(),
650                value.trim().trim_end_matches('\r').to_string(),
651            );
652        }
653    }
654
655    let content_length = headers
656        .get("content-length")
657        .and_then(|raw| raw.parse::<usize>().ok())
658        .unwrap_or(0);
659
660    let mut body = vec![0u8; content_length];
661    if content_length > 0 {
662        reader.read_exact(&mut body)?;
663    }
664
665    Ok(HttpRequest {
666        method: method.to_string(),
667        path: path.to_string(),
668        headers,
669        body,
670    })
671}
672
673fn parse_http_request_line(first_line: &str) -> Option<(&str, &str)> {
674    let mut parts = first_line.split_whitespace();
675    let method = parts.next()?;
676    let path = parts.next()?;
677    Some((method, path))
678}
679
680fn build_response(
681    db: &SqlRite,
682    db_path: &Path,
683    sql_profile: DurabilityProfile,
684    config: &ServerConfig,
685    state: &Arc<Mutex<ControlPlaneState>>,
686    request: &HttpRequest,
687) -> Result<(u16, &'static str, String)> {
688    let (path, query) = split_path_and_query(&request.path);
689    let now_ms = unix_ms_now();
690
691    let chaos_control_endpoint = matches!(
692        path,
693        "/control/v1/chaos/status" | "/control/v1/chaos/inject" | "/control/v1/chaos/clear"
694    );
695    {
696        let mut control = state
697            .lock()
698            .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
699        control.chaos.cleanup_expired(now_ms);
700        if !chaos_control_endpoint
701            && let Some(blocked) = chaos_blocking_response(&mut control, request, path)
702        {
703            return Ok(blocked);
704        }
705
706        let skip_auto_failover = matches!(
707            path,
708            "/control/v1/election/heartbeat"
709                | "/control/v1/replication/receive"
710                | "/control/v1/failover/auto-check"
711        );
712        if !skip_auto_failover {
713            let _ = maybe_trigger_automatic_failover(
714                db_path,
715                &mut control,
716                AutoFailoverEvalInput {
717                    force: false,
718                    simulated_elapsed_ms: None,
719                    reason: Some("periodic_request_tick".to_string()),
720                },
721            )?;
722        }
723    }
724
725    match (request.method.as_str(), path) {
726        ("GET", "/healthz") => {
727            let report = build_health_report(db)?;
728            let control = state
729                .lock()
730                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
731            let payload = json!({
732                "storage": report,
733                "ha": control.snapshot_json(),
734            });
735            Ok((200, "application/json", payload.to_string()))
736        }
737        ("GET", "/readyz") => {
738            let report = build_health_report(db)?;
739            let control = state
740                .lock()
741                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
742            let mut ready = report.integrity_check_ok;
743            if control.profile.replication.enabled
744                && control.runtime.role != ServerRole::Primary
745                && control.runtime.leader_id.is_none()
746            {
747                ready = false;
748            }
749
750            let status = if ready { 200 } else { 503 };
751            let payload = json!({
752                "ready": ready,
753                "schema_version": report.schema_version,
754                "ha_enabled": control.profile.replication.enabled,
755                "role": control.runtime.role,
756                "leader_id": control.runtime.leader_id,
757                "term": control.runtime.current_term,
758                "commit_index": control.runtime.commit_index,
759                "last_log_index": control.runtime.last_log_index,
760                "active_chaos_faults": control.chaos.active_count(),
761            });
762            Ok((status, "application/json", payload.to_string()))
763        }
764        ("GET", "/metrics") => {
765            let report = build_health_report(db)?;
766            let control = state
767                .lock()
768                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
769            let role_metric = match control.runtime.role {
770                ServerRole::Standalone => 0,
771                ServerRole::Primary => 1,
772                ServerRole::Replica => 2,
773            };
774
775            let body = format!(
776                "sqlrite_chunk_count {}\n\
777                 sqlrite_schema_version {}\n\
778                 sqlrite_index_entries {}\n\
779                 sqlrite_ha_enabled {}\n\
780                 sqlrite_ha_role {}\n\
781                 sqlrite_ha_term {}\n\
782                 sqlrite_ha_commit_index {}\n\
783                 sqlrite_ha_last_log_index {}\n\
784                 sqlrite_ha_last_log_term {}\n\
785                 sqlrite_ha_replication_log_entries {}\n\
786                 sqlrite_ha_replication_lag_ms {}\n\
787                 sqlrite_ha_failover_in_progress {}\n\
788                 sqlrite_ha_failover_events_total {}\n\
789                 sqlrite_ha_failover_completed_total {}\n\
790                 sqlrite_ha_failover_last_duration_ms {}\n\
791                 sqlrite_ha_failover_avg_duration_ms {}\n\
792                 sqlrite_ha_restore_events_total {}\n\
793                 sqlrite_ha_restore_completed_total {}\n\
794                 sqlrite_ha_restore_last_duration_ms {}\n\
795                 sqlrite_ha_restore_avg_duration_ms {}\n\
796                 sqlrite_ha_chaos_faults_active {}\n\
797                 sqlrite_ha_chaos_injections_total {}\n\
798                 sqlrite_ha_chaos_blocked_requests_total {}\n\
799                 sqlrite_requests_total {}\n\
800                 sqlrite_requests_server_errors_total {}\n\
801                 sqlrite_requests_client_errors_total {}\n\
802                 sqlrite_requests_sql_total {}\n\
803                 sqlrite_requests_sql_errors_total {}\n\
804                 sqlrite_requests_sql_avg_latency_ms {}\n\
805                 sqlrite_requests_sql_max_latency_ms {}\n\
806                 sqlrite_observability_traces_buffered {}\n\
807                 sqlrite_alert_simulations_total {}\n",
808                report.chunk_count,
809                report.schema_version,
810                report.vector_index_entries,
811                if control.profile.replication.enabled {
812                    1
813                } else {
814                    0
815                },
816                role_metric,
817                control.runtime.current_term,
818                control.runtime.commit_index,
819                control.runtime.last_log_index,
820                control.runtime.last_log_term,
821                control.replication_log.len(),
822                control.runtime.replication_lag_ms,
823                if control.runtime.failover_in_progress {
824                    1
825                } else {
826                    0
827                },
828                control.resilience.failover_events_total,
829                control.resilience.failover_completed_total,
830                control.resilience.last_failover_duration_ms.unwrap_or(0),
831                control.resilience.avg_failover_duration_ms(),
832                control.resilience.restore_events_total,
833                control.resilience.restore_completed_total,
834                control.resilience.last_restore_duration_ms.unwrap_or(0),
835                control.resilience.avg_restore_duration_ms(),
836                control.chaos.active_count(),
837                control.resilience.chaos_injections_total,
838                control.resilience.chaos_blocked_requests_total,
839                control.observability.requests_total,
840                control.observability.requests_server_errors_total,
841                control.observability.requests_client_errors_total,
842                control.observability.sql_requests_total,
843                control.observability.sql_requests_failed_total,
844                control.observability.sql_avg_latency_ms(),
845                control.observability.sql_latency_max_ms,
846                control.observability.traces.len(),
847                control.observability.alert_simulations_total,
848            );
849            Ok((200, "text/plain; version=0.0.4", body))
850        }
851        ("GET", "/control/v1/profile") => {
852            let control = state
853                .lock()
854                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
855            Ok((
856                200,
857                "application/json",
858                serde_json::to_string(&control.profile)?,
859            ))
860        }
861        ("GET", "/control/v1/security") => Ok((
862            200,
863            "application/json",
864            security_summary_json(config).to_string(),
865        )),
866        ("POST", "/control/v1/security/audit/export") => {
867            if !authorize_control_request(request, config) {
868                return Ok(unauthorized_response());
869            }
870            let Some(audit_path) = &config.security.audit_log_path else {
871                return Ok((
872                    400,
873                    "application/json",
874                    json!({"error": "audit log path is not configured"}).to_string(),
875                ));
876            };
877            let input = parse_optional_json_body::<SecurityAuditExportRequest>(request)
878                .map_err(std::io::Error::other)?;
879            let format = match input.format.as_deref() {
880                Some("json") => AuditExportFormat::Json,
881                Some("jsonl") | None => AuditExportFormat::Jsonl,
882                Some(other) => return Ok((
883                    400,
884                    "application/json",
885                    json!({"error": format!("invalid format `{other}`; expected json or jsonl")})
886                        .to_string(),
887                )),
888            };
889            let report = export_audit_events(
890                audit_path,
891                &AuditQuery {
892                    actor_id: input.actor_id,
893                    tenant_id: input.tenant_id,
894                    operation: input.operation,
895                    allowed: input.allowed,
896                    from_unix_ms: input.from_unix_ms,
897                    to_unix_ms: input.to_unix_ms,
898                    limit: input.limit,
899                },
900                input.output_path.as_deref().map(Path::new),
901                format,
902            )?;
903            Ok((200, "application/json", serde_json::to_string(&report)?))
904        }
905        ("GET", "/control/v1/state") => {
906            let control = state
907                .lock()
908                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
909            Ok((200, "application/json", control.snapshot_json().to_string()))
910        }
911        ("GET", "/control/v1/peers") => {
912            let control = state
913                .lock()
914                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
915            let payload = json!({
916                "node_id": control.profile.replication.node_id,
917                "advertise_addr": control.profile.replication.advertise_addr,
918                "peers": control.profile.replication.peers,
919                "sync_ack_quorum": control.profile.replication.sync_ack_quorum,
920                "replica_progress": control.replica_progress,
921            });
922            Ok((200, "application/json", payload.to_string()))
923        }
924        ("GET", "/control/v1/replication/log") => {
925            let from = query
926                .get("from")
927                .and_then(|raw| raw.parse::<u64>().ok())
928                .unwrap_or(1);
929            let limit = query
930                .get("limit")
931                .and_then(|raw| raw.parse::<usize>().ok())
932                .unwrap_or(256)
933                .min(1_024);
934
935            let control = state
936                .lock()
937                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
938            let payload = json!({
939                "from": from,
940                "limit": limit,
941                "entries": control.replication_log.entries_from(from, limit),
942                "last_log_index": control.runtime.last_log_index,
943                "last_log_term": control.runtime.last_log_term,
944                "commit_index": control.runtime.commit_index,
945            });
946            Ok((200, "application/json", payload.to_string()))
947        }
948        ("GET", "/control/v1/resilience") => {
949            let control = state
950                .lock()
951                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
952            Ok((
953                200,
954                "application/json",
955                control
956                    .resilience
957                    .to_json(control.chaos.active_count())
958                    .to_string(),
959            ))
960        }
961        ("GET", "/control/v1/observability/metrics-map") => {
962            let control = state
963                .lock()
964                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
965            let payload = json!({
966                "metrics": [
967                    {"name":"sqlrite_chunk_count","group":"storage","type":"gauge"},
968                    {"name":"sqlrite_schema_version","group":"storage","type":"gauge"},
969                    {"name":"sqlrite_index_entries","group":"retrieval","type":"gauge"},
970                    {"name":"sqlrite_ha_enabled","group":"ha","type":"gauge"},
971                    {"name":"sqlrite_ha_role","group":"ha","type":"gauge"},
972                    {"name":"sqlrite_ha_term","group":"ha","type":"gauge"},
973                    {"name":"sqlrite_ha_commit_index","group":"ha","type":"gauge"},
974                    {"name":"sqlrite_ha_last_log_index","group":"ha","type":"gauge"},
975                    {"name":"sqlrite_ha_last_log_term","group":"ha","type":"gauge"},
976                    {"name":"sqlrite_ha_replication_lag_ms","group":"ha","type":"gauge"},
977                    {"name":"sqlrite_ha_failover_events_total","group":"resilience","type":"counter"},
978                    {"name":"sqlrite_ha_failover_completed_total","group":"resilience","type":"counter"},
979                    {"name":"sqlrite_ha_restore_events_total","group":"resilience","type":"counter"},
980                    {"name":"sqlrite_ha_restore_completed_total","group":"resilience","type":"counter"},
981                    {"name":"sqlrite_ha_chaos_injections_total","group":"chaos","type":"counter"},
982                    {"name":"sqlrite_ha_chaos_blocked_requests_total","group":"chaos","type":"counter"},
983                    {"name":"sqlrite_requests_total","group":"http","type":"counter"},
984                    {"name":"sqlrite_requests_server_errors_total","group":"http","type":"counter"},
985                    {"name":"sqlrite_requests_client_errors_total","group":"http","type":"counter"},
986                    {"name":"sqlrite_requests_sql_total","group":"sql","type":"counter"},
987                    {"name":"sqlrite_requests_sql_errors_total","group":"sql","type":"counter"},
988                    {"name":"sqlrite_requests_sql_avg_latency_ms","group":"sql","type":"gauge"},
989                    {"name":"sqlrite_requests_sql_max_latency_ms","group":"sql","type":"gauge"},
990                    {"name":"sqlrite_observability_traces_buffered","group":"observability","type":"gauge"},
991                    {"name":"sqlrite_alert_simulations_total","group":"observability","type":"counter"}
992                ],
993                "state": control.observability.to_json(),
994            });
995            Ok((200, "application/json", payload.to_string()))
996        }
997        ("GET", "/control/v1/traces/recent") => {
998            let limit = query
999                .get("limit")
1000                .and_then(|raw| raw.parse::<usize>().ok())
1001                .unwrap_or(50)
1002                .min(512);
1003            let control = state
1004                .lock()
1005                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1006            let payload = json!({
1007                "limit": limit,
1008                "traces": control.observability.recent_traces(limit),
1009            });
1010            Ok((200, "application/json", payload.to_string()))
1011        }
1012        ("POST", "/control/v1/observability/reset") => {
1013            if !authorize_control_request(request, config) {
1014                return Ok(unauthorized_response());
1015            }
1016            let mut control = state
1017                .lock()
1018                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1019            control.observability.reset();
1020            let payload = json!({
1021                "status": "reset",
1022                "observability": control.observability.to_json(),
1023            });
1024            Ok((200, "application/json", payload.to_string()))
1025        }
1026        ("GET", "/control/v1/alerts/templates") => {
1027            let control = state
1028                .lock()
1029                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1030            let payload = json!({
1031                "templates": [
1032                    {
1033                        "id": "sql_error_rate_high",
1034                        "severity": "warning",
1035                        "threshold": {"sql_error_rate_gt": 0.05},
1036                        "description": "SQL API error rate exceeded 5%"
1037                    },
1038                    {
1039                        "id": "sql_latency_high",
1040                        "severity": "warning",
1041                        "threshold": {"sql_avg_latency_ms_gt": 50.0},
1042                        "description": "SQL API average latency exceeded 50ms"
1043                    },
1044                    {
1045                        "id": "replication_lag_high",
1046                        "severity": "critical",
1047                        "threshold": {"replication_lag_ms_gt": control.profile.replication.max_replication_lag_ms},
1048                        "description": "Replication lag exceeded configured max_replication_lag_ms"
1049                    },
1050                    {
1051                        "id": "restore_stuck",
1052                        "severity": "critical",
1053                        "threshold": {"restore_active_ms_gt": control.profile.recovery.snapshot_interval_seconds.saturating_mul(1_000)},
1054                        "description": "Restore workflow appears stuck beyond snapshot interval"
1055                    }
1056                ]
1057            });
1058            Ok((200, "application/json", payload.to_string()))
1059        }
1060        ("POST", "/control/v1/alerts/simulate") => {
1061            if !authorize_control_request(request, config) {
1062                return Ok(unauthorized_response());
1063            }
1064            let input = parse_optional_json_body::<AlertSimulationRequest>(request)
1065                .map_err(std::io::Error::other)?;
1066            let mut control = state
1067                .lock()
1068                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1069
1070            let observed_sql_error_rate = if control.observability.sql_requests_total == 0 {
1071                0.0
1072            } else {
1073                control.observability.sql_requests_failed_total as f64
1074                    / control.observability.sql_requests_total as f64
1075            };
1076            let observed_restore_active_ms = control
1077                .resilience
1078                .active_restore_started_unix_ms
1079                .map(|start| now_ms.saturating_sub(start))
1080                .unwrap_or(0);
1081            let eval_sql_error_rate = input.sql_error_rate.unwrap_or(observed_sql_error_rate);
1082            let eval_sql_avg_latency_ms = input
1083                .sql_avg_latency_ms
1084                .unwrap_or(control.observability.sql_avg_latency_ms());
1085            let eval_replication_lag_ms = input
1086                .replication_lag_ms
1087                .unwrap_or(control.runtime.replication_lag_ms);
1088            let eval_restore_active_ms = input
1089                .restore_active_ms
1090                .unwrap_or(observed_restore_active_ms);
1091
1092            let mut fired = Vec::<Value>::new();
1093            if eval_sql_error_rate > 0.05 {
1094                fired.push(json!({"id":"sql_error_rate_high","severity":"warning","value":eval_sql_error_rate}));
1095            }
1096            if eval_sql_avg_latency_ms > 50.0 {
1097                fired.push(json!({"id":"sql_latency_high","severity":"warning","value":eval_sql_avg_latency_ms}));
1098            }
1099            if eval_replication_lag_ms > control.profile.replication.max_replication_lag_ms {
1100                fired.push(json!({"id":"replication_lag_high","severity":"critical","value":eval_replication_lag_ms}));
1101            }
1102            if eval_restore_active_ms
1103                > control
1104                    .profile
1105                    .recovery
1106                    .snapshot_interval_seconds
1107                    .saturating_mul(1_000)
1108            {
1109                fired.push(json!({"id":"restore_stuck","severity":"critical","value":eval_restore_active_ms}));
1110            }
1111            control.observability.alert_simulations_total = control
1112                .observability
1113                .alert_simulations_total
1114                .saturating_add(1);
1115            let payload = json!({
1116                "fired_alerts": fired,
1117                "evaluated": {
1118                    "sql_error_rate": eval_sql_error_rate,
1119                    "sql_avg_latency_ms": eval_sql_avg_latency_ms,
1120                    "replication_lag_ms": eval_replication_lag_ms,
1121                    "restore_active_ms": eval_restore_active_ms,
1122                },
1123                "simulation_count": control.observability.alert_simulations_total,
1124            });
1125            Ok((200, "application/json", payload.to_string()))
1126        }
1127        ("GET", "/control/v1/slo/report") => {
1128            let control = state
1129                .lock()
1130                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1131            let availability_percent = if control.observability.requests_total == 0 {
1132                100.0
1133            } else {
1134                let successful = control
1135                    .observability
1136                    .requests_total
1137                    .saturating_sub(control.observability.requests_server_errors_total);
1138                (successful as f64 / control.observability.requests_total as f64) * 100.0
1139            };
1140            let rpo_seconds = (control.runtime.replication_lag_ms as f64) / 1_000.0;
1141            let payload = json!({
1142                "availability": {
1143                    "observed_percent": availability_percent,
1144                    "target_percent": 99.95,
1145                    "passes_target": availability_percent >= 99.95,
1146                    "requests_total": control.observability.requests_total,
1147                    "server_errors_total": control.observability.requests_server_errors_total,
1148                },
1149                "rpo": {
1150                    "observed_seconds": rpo_seconds,
1151                    "target_seconds": 60.0,
1152                    "passes_target": rpo_seconds <= 60.0,
1153                },
1154                "resilience": control.resilience.to_json(control.chaos.active_count()),
1155                "observability": control.observability.to_json(),
1156            });
1157            Ok((200, "application/json", payload.to_string()))
1158        }
1159        ("GET", "/control/v1/failover/status") => {
1160            let control = state
1161                .lock()
1162                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1163            let heartbeat_age_ms = control
1164                .runtime
1165                .last_heartbeat_unix_ms
1166                .map(|last| now_ms.saturating_sub(last));
1167            let payload = json!({
1168                "automatic_enabled": control.profile.replication.failover_mode == FailoverMode::Automatic,
1169                "heartbeat_age_ms": heartbeat_age_ms,
1170                "election_timeout_ms": control.profile.replication.election_timeout_ms,
1171                "role": control.runtime.role,
1172                "leader_id": control.runtime.leader_id,
1173                "failover_in_progress": control.runtime.failover_in_progress,
1174                "resilience": control.resilience.to_json(control.chaos.active_count()),
1175            });
1176            Ok((200, "application/json", payload.to_string()))
1177        }
1178        ("POST", "/control/v1/failover/auto-check") => {
1179            if !authorize_control_request(request, config) {
1180                return Ok(unauthorized_response());
1181            }
1182            let check = parse_optional_json_body::<AutoFailoverCheckRequest>(request)
1183                .map_err(std::io::Error::other)?;
1184            let mut control = state
1185                .lock()
1186                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1187            let event = maybe_trigger_automatic_failover(
1188                db_path,
1189                &mut control,
1190                AutoFailoverEvalInput {
1191                    force: check.force,
1192                    simulated_elapsed_ms: check.simulate_elapsed_ms,
1193                    reason: check.reason,
1194                },
1195            )?;
1196            let payload = json!({
1197                "triggered": event.is_some(),
1198                "event": event,
1199                "state": control.runtime,
1200                "resilience": control.resilience.to_json(control.chaos.active_count()),
1201            });
1202            Ok((200, "application/json", payload.to_string()))
1203        }
1204        ("POST", "/control/v1/failover/start") => {
1205            if !authorize_control_request(request, config) {
1206                return Ok(unauthorized_response());
1207            }
1208
1209            let mut control = state
1210                .lock()
1211                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1212            control.runtime.mark_failover_started();
1213            control.resilience.start_failover();
1214            persist_resilience_event(db_path, "failover_started", None, "manual_start")?;
1215            persist_runtime_marker(db_path, "failover_started", "true")?;
1216            Ok((
1217                200,
1218                "application/json",
1219                serde_json::to_string(&control.runtime)?,
1220            ))
1221        }
1222        ("POST", "/control/v1/failover/promote") => {
1223            if !authorize_control_request(request, config) {
1224                return Ok(unauthorized_response());
1225            }
1226
1227            let promote = parse_optional_json_body::<PromoteRequest>(request)
1228                .map_err(std::io::Error::other)?;
1229            let mut control = state
1230                .lock()
1231                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1232
1233            let leader = promote
1234                .leader_id
1235                .unwrap_or_else(|| control.profile.replication.node_id.clone());
1236            control.profile.replication.enabled = true;
1237            control.profile.replication.role = ServerRole::Primary;
1238            control.runtime.promote_to_primary(leader);
1239            if let Some(duration_ms) = control.resilience.complete_failover() {
1240                persist_resilience_event(
1241                    db_path,
1242                    "failover_completed",
1243                    Some(duration_ms),
1244                    "manual_promote",
1245                )?;
1246            }
1247            persist_runtime_marker(
1248                db_path,
1249                "last_role_transition",
1250                &serde_json::to_string(&control.runtime)?,
1251            )?;
1252            Ok((
1253                200,
1254                "application/json",
1255                serde_json::to_string(&control.runtime)?,
1256            ))
1257        }
1258        ("POST", "/control/v1/failover/step-down") => {
1259            if !authorize_control_request(request, config) {
1260                return Ok(unauthorized_response());
1261            }
1262
1263            let step_down = parse_optional_json_body::<StepDownRequest>(request)
1264                .map_err(std::io::Error::other)?;
1265            let mut control = state
1266                .lock()
1267                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1268
1269            control.profile.replication.enabled = true;
1270            control.profile.replication.role = ServerRole::Replica;
1271            control.runtime.step_down_to_replica(step_down.leader_id);
1272            persist_runtime_marker(
1273                db_path,
1274                "last_role_transition",
1275                &serde_json::to_string(&control.runtime)?,
1276            )?;
1277            Ok((
1278                200,
1279                "application/json",
1280                serde_json::to_string(&control.runtime)?,
1281            ))
1282        }
1283        ("POST", "/control/v1/recovery/mark-restored") => {
1284            if !authorize_control_request(request, config) {
1285                return Ok(unauthorized_response());
1286            }
1287
1288            let recovery = parse_optional_json_body::<RecoveryRequest>(request)
1289                .map_err(std::io::Error::other)?;
1290            let mut control = state
1291                .lock()
1292                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1293
1294            let event = format!(
1295                "backup_artifact={} note={}",
1296                recovery
1297                    .backup_artifact
1298                    .unwrap_or_else(|| "<unspecified>".to_string()),
1299                recovery.note.unwrap_or_else(|| "<none>".to_string())
1300            );
1301            control.runtime.mark_recovery_event(event.clone());
1302            if control.resilience.active_restore_started_unix_ms.is_none() {
1303                control.resilience.start_restore();
1304            }
1305            if let Some(duration_ms) = control.resilience.complete_restore() {
1306                persist_resilience_event(
1307                    db_path,
1308                    "restore_completed",
1309                    Some(duration_ms),
1310                    "recovery_mark_restored",
1311                )?;
1312            }
1313            persist_runtime_marker(db_path, "last_recovery_event", &event)?;
1314
1315            Ok((
1316                200,
1317                "application/json",
1318                serde_json::to_string(&control.runtime)?,
1319            ))
1320        }
1321        ("POST", "/control/v1/recovery/start") => {
1322            if !authorize_control_request(request, config) {
1323                return Ok(unauthorized_response());
1324            }
1325            let recovery = parse_optional_json_body::<RecoveryStartRequest>(request)
1326                .map_err(std::io::Error::other)?;
1327            let mut control = state
1328                .lock()
1329                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1330            control.resilience.start_restore();
1331            let note = recovery
1332                .note
1333                .unwrap_or_else(|| "restore_started".to_string());
1334            persist_resilience_event(db_path, "restore_started", None, &note)?;
1335            Ok((
1336                200,
1337                "application/json",
1338                json!({
1339                    "restore_in_progress": true,
1340                    "started_unix_ms": control.resilience.active_restore_started_unix_ms,
1341                })
1342                .to_string(),
1343            ))
1344        }
1345        ("POST", "/control/v1/recovery/snapshot") => {
1346            if !authorize_control_request(request, config) {
1347                return Ok(unauthorized_response());
1348            }
1349            let input = parse_optional_json_body::<RecoverySnapshotRequest>(request)
1350                .map_err(std::io::Error::other)?;
1351            let backup_dir = {
1352                let control = state
1353                    .lock()
1354                    .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1355                control.profile.recovery.backup_dir.clone()
1356            };
1357            let snapshot = create_backup_snapshot(db_path, &backup_dir, input.note.as_deref())?;
1358            let payload = json!({
1359                "snapshot": snapshot,
1360                "backup_dir": backup_dir,
1361            });
1362            Ok((200, "application/json", payload.to_string()))
1363        }
1364        ("GET", "/control/v1/recovery/snapshots") => {
1365            let backup_dir = {
1366                let control = state
1367                    .lock()
1368                    .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1369                control.profile.recovery.backup_dir.clone()
1370            };
1371            let limit = query
1372                .get("limit")
1373                .and_then(|raw| raw.parse::<usize>().ok())
1374                .unwrap_or(100)
1375                .min(1_000);
1376            let mut snapshots = list_backup_snapshots(&backup_dir)?;
1377            if snapshots.len() > limit {
1378                snapshots.truncate(limit);
1379            }
1380            let payload = json!({
1381                "backup_dir": backup_dir,
1382                "count": snapshots.len(),
1383                "snapshots": snapshots,
1384            });
1385            Ok((200, "application/json", payload.to_string()))
1386        }
1387        ("POST", "/control/v1/recovery/verify-restore") => {
1388            if !authorize_control_request(request, config) {
1389                return Ok(unauthorized_response());
1390            }
1391            let input = parse_optional_json_body::<RecoveryVerifyRestoreRequest>(request)
1392                .map_err(std::io::Error::other)?;
1393            let backup_dir = {
1394                let control = state
1395                    .lock()
1396                    .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1397                control.profile.recovery.backup_dir.clone()
1398            };
1399            let snapshots = list_backup_snapshots(&backup_dir)?;
1400            let selected = if let Some(snapshot_path) = input.snapshot_path.clone() {
1401                snapshots
1402                    .into_iter()
1403                    .find(|snapshot| snapshot.snapshot_path == snapshot_path)
1404                    .ok_or_else(|| {
1405                        std::io::Error::other("requested snapshot_path is not in backup catalog")
1406                    })?
1407            } else if let Some(target) = input.target_unix_ms {
1408                select_backup_snapshot_for_time(&backup_dir, target)?.ok_or_else(|| {
1409                    std::io::Error::other(
1410                        "no snapshot exists at or before requested target_unix_ms",
1411                    )
1412                })?
1413            } else {
1414                snapshots.into_iter().next().ok_or_else(|| {
1415                    std::io::Error::other("no snapshots available for restore verification")
1416                })?
1417            };
1418            let verify_dir = Path::new(&backup_dir).join("restore_verification");
1419            std::fs::create_dir_all(&verify_dir)?;
1420            let target_path = verify_dir.join(format!(
1421                "verify-{}-{}.db",
1422                selected.snapshot_id,
1423                unix_ms_now()
1424            ));
1425            let report = restore_backup_file_verified(&selected.snapshot_path, &target_path)?;
1426            let note = input.note.unwrap_or_else(|| "verify_restore".to_string());
1427            {
1428                let mut control = state
1429                    .lock()
1430                    .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1431                if control.resilience.active_restore_started_unix_ms.is_none() {
1432                    control.resilience.start_restore();
1433                    persist_resilience_event(db_path, "restore_started", None, &note)?;
1434                }
1435                let event = format!(
1436                    "verify_restore snapshot={} artifact={}",
1437                    selected.snapshot_id,
1438                    target_path.display()
1439                );
1440                control.runtime.mark_recovery_event(event.clone());
1441                if let Some(duration_ms) = control.resilience.complete_restore() {
1442                    persist_resilience_event(
1443                        db_path,
1444                        "restore_completed",
1445                        Some(duration_ms),
1446                        "verify_restore_completed",
1447                    )?;
1448                }
1449                persist_runtime_marker(db_path, "last_recovery_event", &event)?;
1450            }
1451
1452            let keep_artifact = input.keep_artifact;
1453            if !keep_artifact {
1454                let _ = std::fs::remove_file(&target_path);
1455                let _ = std::fs::remove_file(Path::new(&format!("{}-wal", target_path.display())));
1456                let _ = std::fs::remove_file(Path::new(&format!("{}-shm", target_path.display())));
1457            }
1458            let payload = json!({
1459                "selected_snapshot": selected,
1460                "restore_verified": report.integrity_check_ok,
1461                "verification": report,
1462                "restore_artifact_path": target_path.display().to_string(),
1463                "artifact_kept": keep_artifact,
1464            });
1465            Ok((200, "application/json", payload.to_string()))
1466        }
1467        ("POST", "/control/v1/recovery/prune-snapshots") => {
1468            if !authorize_control_request(request, config) {
1469                return Ok(unauthorized_response());
1470            }
1471            let input = parse_optional_json_body::<RecoveryPruneRequest>(request)
1472                .map_err(std::io::Error::other)?;
1473            let backup_dir;
1474            let retention_seconds;
1475            {
1476                let control = state
1477                    .lock()
1478                    .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1479                backup_dir = control.profile.recovery.backup_dir.clone();
1480                retention_seconds = input
1481                    .retention_seconds
1482                    .unwrap_or(control.profile.recovery.pitr_retention_seconds);
1483            }
1484            let report = prune_backup_snapshots(&backup_dir, retention_seconds)?;
1485            let payload = json!({
1486                "backup_dir": backup_dir,
1487                "report": report,
1488            });
1489            Ok((200, "application/json", payload.to_string()))
1490        }
1491        ("GET", "/control/v1/chaos/status") => {
1492            let control = state
1493                .lock()
1494                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1495            let payload = json!({
1496                "chaos": control.chaos.to_json(),
1497                "resilience": control.resilience.to_json(control.chaos.active_count()),
1498            });
1499            Ok((200, "application/json", payload.to_string()))
1500        }
1501        ("POST", "/control/v1/chaos/inject") => {
1502            if !authorize_control_request(request, config) {
1503                return Ok(unauthorized_response());
1504            }
1505            let inject = match parse_json_body::<ChaosInjectRequest>(request) {
1506                Ok(payload) => payload,
1507                Err(error) => {
1508                    return Ok((400, "application/json", json!({"error": error}).to_string()));
1509                }
1510            };
1511            let mut control = state
1512                .lock()
1513                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1514            let scenario = inject.scenario.as_str().to_string();
1515            control.chaos.inject(inject);
1516            control.resilience.chaos_injections_total =
1517                control.resilience.chaos_injections_total.saturating_add(1);
1518            persist_chaos_event(db_path, "inject", &scenario)?;
1519            let payload = json!({
1520                "status": "injected",
1521                "scenario": scenario,
1522                "chaos": control.chaos.to_json(),
1523            });
1524            Ok((200, "application/json", payload.to_string()))
1525        }
1526        ("POST", "/control/v1/chaos/clear") => {
1527            if !authorize_control_request(request, config) {
1528                return Ok(unauthorized_response());
1529            }
1530            let clear = parse_optional_json_body::<ChaosClearRequest>(request)
1531                .map_err(std::io::Error::other)?;
1532            let mut control = state
1533                .lock()
1534                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1535            let scenario_name = clear.scenario.map(|scenario| scenario.as_str().to_string());
1536            control.chaos.clear(clear.scenario);
1537            persist_chaos_event(db_path, "clear", scenario_name.as_deref().unwrap_or("all"))?;
1538            let payload = json!({
1539                "status": "cleared",
1540                "scenario": scenario_name,
1541                "chaos": control.chaos.to_json(),
1542            });
1543            Ok((200, "application/json", payload.to_string()))
1544        }
1545        ("POST", "/control/v1/replication/append") => {
1546            if !authorize_control_request(request, config) {
1547                return Ok(unauthorized_response());
1548            }
1549
1550            let append = match parse_json_body::<ReplicationAppendRequest>(request) {
1551                Ok(payload) => payload,
1552                Err(error) => {
1553                    return Ok((400, "application/json", json!({"error": error}).to_string()));
1554                }
1555            };
1556
1557            let mut control = state
1558                .lock()
1559                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1560            if !control.profile.replication.enabled || control.runtime.role != ServerRole::Primary {
1561                return Ok((
1562                    409,
1563                    "application/json",
1564                    json!({"error": "replication append requires enabled primary role"})
1565                        .to_string(),
1566                ));
1567            }
1568
1569            control.runtime.current_term = control.runtime.current_term.max(1);
1570            let operation = append
1571                .operation
1572                .unwrap_or_else(|| "state_mutation".to_string())
1573                .trim()
1574                .to_string();
1575            if operation.is_empty() {
1576                return Ok((
1577                    400,
1578                    "application/json",
1579                    json!({"error": "operation cannot be empty"}).to_string(),
1580                ));
1581            }
1582
1583            let current_term = control.runtime.current_term;
1584            let node_id = control.profile.replication.node_id.clone();
1585            let entry = match control.replication_log.append_leader_event(
1586                current_term,
1587                &node_id,
1588                operation,
1589                append.payload,
1590                &node_id,
1591            ) {
1592                Ok(entry) => entry,
1593                Err(error) => {
1594                    return Ok((400, "application/json", json!({"error": error}).to_string()));
1595                }
1596            };
1597            let last_index = control.replication_log.last_index();
1598            let last_term = control.replication_log.last_term();
1599            control.runtime.note_log_position(last_index, last_term);
1600
1601            let new_commit = control.replication_log.compute_commit_index(
1602                control.runtime.commit_index,
1603                control.profile.replication.sync_ack_quorum,
1604            );
1605            control.runtime.advance_commit_index(new_commit);
1606
1607            append_replication_entry_to_store(
1608                db_path,
1609                &entry,
1610                entry.index <= control.runtime.commit_index,
1611            )?;
1612            if new_commit > 0 {
1613                mark_committed_replication_entries(db_path, new_commit)?;
1614            }
1615
1616            let payload = json!({
1617                "entry": entry,
1618                "commit_index": control.runtime.commit_index,
1619                "required_quorum": control.profile.replication.sync_ack_quorum,
1620                "ack_count": control.replication_log.ack_count(control.runtime.last_log_index),
1621            });
1622            Ok((200, "application/json", payload.to_string()))
1623        }
1624        ("POST", "/control/v1/replication/receive") => {
1625            if !authorize_control_request(request, config) {
1626                return Ok(unauthorized_response());
1627            }
1628
1629            let receive = match parse_json_body::<ReplicationReceiveRequest>(request) {
1630                Ok(payload) => payload,
1631                Err(error) => {
1632                    return Ok((400, "application/json", json!({"error": error}).to_string()));
1633                }
1634            };
1635
1636            let mut control = state
1637                .lock()
1638                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1639            if !control.profile.replication.enabled {
1640                return Ok((
1641                    409,
1642                    "application/json",
1643                    json!({"error": "replication is not enabled"}).to_string(),
1644                ));
1645            }
1646            if receive.term < control.runtime.current_term {
1647                return Ok((
1648                    409,
1649                    "application/json",
1650                    json!({"error": "stale replication term", "term": control.runtime.current_term}).to_string(),
1651                ));
1652            }
1653
1654            control.runtime.adopt_term(receive.term);
1655            control
1656                .runtime
1657                .step_down_to_replica(Some(receive.leader_id.clone()));
1658
1659            if let Err(error) = control.replication_log.append_remote_entries(
1660                receive.prev_log_index,
1661                receive.prev_log_term,
1662                &receive.entries,
1663            ) {
1664                return Ok((409, "application/json", json!({"error": error}).to_string()));
1665            }
1666
1667            let last_index = control.replication_log.last_index();
1668            let last_term = control.replication_log.last_term();
1669            control.runtime.note_log_position(last_index, last_term);
1670            if let Some(leader_commit) = receive.leader_commit {
1671                control.runtime.advance_commit_index(leader_commit);
1672            }
1673
1674            rewrite_replication_log_store(
1675                db_path,
1676                &control.replication_log,
1677                control.runtime.commit_index,
1678            )?;
1679            let payload = json!({
1680                "accepted": true,
1681                "term": control.runtime.current_term,
1682                "last_log_index": control.runtime.last_log_index,
1683                "last_log_term": control.runtime.last_log_term,
1684                "commit_index": control.runtime.commit_index,
1685            });
1686            Ok((200, "application/json", payload.to_string()))
1687        }
1688        ("POST", "/control/v1/replication/ack") => {
1689            if !authorize_control_request(request, config) {
1690                return Ok(unauthorized_response());
1691            }
1692            let ack = match parse_json_body::<ReplicationAckRequest>(request) {
1693                Ok(payload) => payload,
1694                Err(error) => {
1695                    return Ok((400, "application/json", json!({"error": error}).to_string()));
1696                }
1697            };
1698
1699            let mut control = state
1700                .lock()
1701                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1702            if ack.index == 0 {
1703                return Ok((
1704                    400,
1705                    "application/json",
1706                    json!({"error": "ack index must be >= 1"}).to_string(),
1707                ));
1708            }
1709            let ack_count = control
1710                .replication_log
1711                .acknowledge(ack.index, ack.node_id.clone());
1712            if ack_count == 0 {
1713                return Ok((
1714                    409,
1715                    "application/json",
1716                    json!({"error": "ack index is outside local replication log"}).to_string(),
1717                ));
1718            }
1719            control
1720                .replica_progress
1721                .insert(ack.node_id.clone(), ack.index);
1722
1723            let new_commit = control.replication_log.compute_commit_index(
1724                control.runtime.commit_index,
1725                control.profile.replication.sync_ack_quorum,
1726            );
1727            control.runtime.advance_commit_index(new_commit);
1728            if new_commit > 0 {
1729                mark_committed_replication_entries(db_path, new_commit)?;
1730            }
1731
1732            let payload = json!({
1733                "node_id": ack.node_id,
1734                "index": ack.index,
1735                "ack_count": ack_count,
1736                "commit_index": control.runtime.commit_index,
1737                "required_quorum": control.profile.replication.sync_ack_quorum,
1738            });
1739            Ok((200, "application/json", payload.to_string()))
1740        }
1741        ("POST", "/control/v1/replication/reconcile") => {
1742            if !authorize_control_request(request, config) {
1743                return Ok(unauthorized_response());
1744            }
1745            let reconcile = parse_optional_json_body::<ReplicationReconcileRequest>(request)
1746                .map_err(std::io::Error::other)?;
1747            let mut control = state
1748                .lock()
1749                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1750            let node_id = reconcile
1751                .node_id
1752                .unwrap_or_else(|| "unknown-replica".to_string());
1753            let last_applied = reconcile.last_applied_index.unwrap_or(0);
1754            control
1755                .replica_progress
1756                .insert(node_id.clone(), last_applied);
1757
1758            if let Some(peer_commit) = reconcile.commit_index {
1759                // Replica reconciliation reports can only move commit forward up to local log head.
1760                control.runtime.advance_commit_index(peer_commit);
1761            }
1762            if let Some(lag_ms) = reconcile.replication_lag_ms {
1763                control.runtime.replication_lag_ms = lag_ms;
1764            } else {
1765                let lag_entries = control.runtime.commit_index.saturating_sub(last_applied);
1766                control.runtime.replication_lag_ms =
1767                    lag_entries.saturating_mul(control.profile.replication.heartbeat_interval_ms);
1768            }
1769
1770            let missing_from = last_applied.saturating_add(1);
1771            let missing_entries = control.replication_log.entries_from(missing_from, 128);
1772            persist_reconcile_event(
1773                db_path,
1774                &node_id,
1775                last_applied,
1776                control.runtime.commit_index,
1777                control.runtime.replication_lag_ms,
1778            )?;
1779
1780            let payload = json!({
1781                "node_id": node_id,
1782                "last_applied_index": last_applied,
1783                "commit_index": control.runtime.commit_index,
1784                "missing_entries_count": missing_entries.len(),
1785                "missing_entries": missing_entries,
1786                "replication_lag_ms": control.runtime.replication_lag_ms,
1787            });
1788            Ok((200, "application/json", payload.to_string()))
1789        }
1790        ("POST", "/control/v1/election/request-vote") => {
1791            if !authorize_control_request(request, config) {
1792                return Ok(unauthorized_response());
1793            }
1794            let vote = match parse_json_body::<ElectionVoteRequest>(request) {
1795                Ok(payload) => payload,
1796                Err(error) => {
1797                    return Ok((400, "application/json", json!({"error": error}).to_string()));
1798                }
1799            };
1800
1801            let mut control = state
1802                .lock()
1803                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1804            if !control.profile.replication.enabled {
1805                return Ok((
1806                    409,
1807                    "application/json",
1808                    json!({"error": "election requires replication mode"}).to_string(),
1809                ));
1810            }
1811
1812            let mut granted = false;
1813            let reason: String;
1814            if vote.term < control.runtime.current_term {
1815                reason = "stale_term".to_string();
1816            } else {
1817                control.runtime.adopt_term(vote.term);
1818                if control.runtime.can_grant_vote(
1819                    vote.term,
1820                    &vote.candidate_id,
1821                    vote.candidate_last_log_index,
1822                    vote.candidate_last_log_term,
1823                ) {
1824                    granted = true;
1825                    control
1826                        .runtime
1827                        .grant_vote(vote.term, vote.candidate_id.clone());
1828                    reason = "vote_granted".to_string();
1829                } else if control
1830                    .runtime
1831                    .voted_for
1832                    .as_ref()
1833                    .is_some_and(|value| value != &vote.candidate_id)
1834                {
1835                    reason = "already_voted".to_string();
1836                } else {
1837                    reason = "candidate_log_outdated".to_string();
1838                }
1839            }
1840
1841            persist_vote_event(
1842                db_path,
1843                control.runtime.current_term,
1844                &control.profile.replication.node_id,
1845                &vote.candidate_id,
1846                granted,
1847                &reason,
1848            )?;
1849
1850            let payload = json!({
1851                "term": control.runtime.current_term,
1852                "vote_granted": granted,
1853                "reason": reason,
1854                "voted_for": control.runtime.voted_for,
1855            });
1856            Ok((200, "application/json", payload.to_string()))
1857        }
1858        ("POST", "/control/v1/election/heartbeat") => {
1859            if !authorize_control_request(request, config) {
1860                return Ok(unauthorized_response());
1861            }
1862            let heartbeat = match parse_json_body::<ElectionHeartbeatRequest>(request) {
1863                Ok(payload) => payload,
1864                Err(error) => {
1865                    return Ok((400, "application/json", json!({"error": error}).to_string()));
1866                }
1867            };
1868
1869            let mut control = state
1870                .lock()
1871                .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1872            if heartbeat.term < control.runtime.current_term {
1873                return Ok((
1874                    409,
1875                    "application/json",
1876                    json!({"accepted": false, "reason": "stale_term", "term": control.runtime.current_term}).to_string(),
1877                ));
1878            }
1879
1880            control.runtime.adopt_term(heartbeat.term);
1881            if heartbeat.leader_id != control.profile.replication.node_id {
1882                control
1883                    .runtime
1884                    .step_down_to_replica(Some(heartbeat.leader_id.clone()));
1885            }
1886            if let Some(leader_last_log_index) = heartbeat.leader_last_log_index
1887                && leader_last_log_index > control.runtime.last_log_index
1888            {
1889                control.runtime.mark_failover_started();
1890            }
1891            control.runtime.mark_heartbeat(
1892                Some(heartbeat.leader_id.clone()),
1893                heartbeat.commit_index,
1894                heartbeat.replication_lag_ms.unwrap_or(0),
1895            );
1896            if control.runtime.failover_in_progress
1897                && let Some(duration_ms) = control.resilience.complete_failover()
1898            {
1899                persist_resilience_event(
1900                    db_path,
1901                    "failover_completed",
1902                    Some(duration_ms),
1903                    "heartbeat_stabilized",
1904                )?;
1905            }
1906            if control.runtime.commit_index > 0 {
1907                mark_committed_replication_entries(db_path, control.runtime.commit_index)?;
1908            }
1909
1910            let payload = json!({
1911                "accepted": true,
1912                "term": control.runtime.current_term,
1913                "leader_id": heartbeat.leader_id,
1914                "role": control.runtime.role,
1915                "commit_index": control.runtime.commit_index,
1916                "last_applied_index": control.runtime.last_applied_index,
1917            });
1918            Ok((200, "application/json", payload.to_string()))
1919        }
1920        ("GET", "/v1/openapi.json") => Ok((
1921            200,
1922            "application/json",
1923            openapi_query_document(config.enable_sql_endpoint).to_string(),
1924        )),
1925        ("POST", "/v1/sql") if config.enable_sql_endpoint => {
1926            let sql_request = match parse_json_body::<SqlApiRequest>(request) {
1927                Ok(payload) => payload,
1928                Err(error) => {
1929                    let payload = json!({"error": error});
1930                    return Ok((400, "application/json", payload.to_string()));
1931                }
1932            };
1933            execute_sql_api_statement(db_path, sql_profile, config, request, sql_request)
1934        }
1935        ("POST", "/v1/query") if config.enable_sql_endpoint => {
1936            let input = match parse_json_body::<QueryApiRequest>(request) {
1937                Ok(payload) => payload,
1938                Err(error) => {
1939                    let payload = json!({"error": error});
1940                    return Ok((400, "application/json", payload.to_string()));
1941                }
1942            };
1943            match execute_query_api(db, config, request, path, input, false) {
1944                Ok(payload) => Ok((200, "application/json", payload.to_string())),
1945                Err(error)
1946                    if matches!(
1947                        &error,
1948                        crate::SqlRiteError::Io(io_error)
1949                            if io_error.kind() == std::io::ErrorKind::PermissionDenied
1950                    ) =>
1951                {
1952                    Ok((
1953                        403,
1954                        "application/json",
1955                        json!({"error": error.to_string()}).to_string(),
1956                    ))
1957                }
1958                Err(error) => Ok((
1959                    400,
1960                    "application/json",
1961                    json!({"error": error.to_string()}).to_string(),
1962                )),
1963            }
1964        }
1965        ("POST", "/v1/query-compact") if config.enable_sql_endpoint => {
1966            let input = match parse_json_body::<QueryApiRequest>(request) {
1967                Ok(payload) => payload,
1968                Err(error) => {
1969                    let payload = json!({"error": error});
1970                    return Ok((400, "application/json", payload.to_string()));
1971                }
1972            };
1973            match execute_query_api(db, config, request, path, input, true) {
1974                Ok(payload) => Ok((200, "application/json", payload.to_string())),
1975                Err(error)
1976                    if matches!(
1977                        &error,
1978                        crate::SqlRiteError::Io(io_error)
1979                            if io_error.kind() == std::io::ErrorKind::PermissionDenied
1980                    ) =>
1981                {
1982                    Ok((
1983                        403,
1984                        "application/json",
1985                        json!({"error": error.to_string()}).to_string(),
1986                    ))
1987                }
1988                Err(error) => Ok((
1989                    400,
1990                    "application/json",
1991                    json!({"error": error.to_string()}).to_string(),
1992                )),
1993            }
1994        }
1995        ("POST", "/v1/rerank-hook") if config.enable_sql_endpoint => {
1996            let input = match parse_json_body::<RerankHookRequest>(request) {
1997                Ok(payload) => payload,
1998                Err(error) => {
1999                    let payload = json!({"error": error});
2000                    return Ok((400, "application/json", payload.to_string()));
2001                }
2002            };
2003            match execute_rerank_hook_api(db, config, request, path, input) {
2004                Ok(payload) => Ok((200, "application/json", payload.to_string())),
2005                Err(error)
2006                    if matches!(
2007                        &error,
2008                        crate::SqlRiteError::Io(io_error)
2009                            if io_error.kind() == std::io::ErrorKind::PermissionDenied
2010                    ) =>
2011                {
2012                    Ok((
2013                        403,
2014                        "application/json",
2015                        json!({"error": error.to_string()}).to_string(),
2016                    ))
2017                }
2018                Err(error) => Ok((
2019                    400,
2020                    "application/json",
2021                    json!({"error": error.to_string()}).to_string(),
2022                )),
2023            }
2024        }
2025        ("POST", "/grpc/sqlrite.v1.QueryService/Sql") if config.enable_sql_endpoint => {
2026            let sql_request = match parse_json_body::<SqlApiRequest>(request) {
2027                Ok(payload) => payload,
2028                Err(error) => {
2029                    let payload = json!({"error": error});
2030                    return Ok((400, "application/json", payload.to_string()));
2031                }
2032            };
2033            execute_sql_api_statement(db_path, sql_profile, config, request, sql_request)
2034        }
2035        ("POST", "/grpc/sqlrite.v1.QueryService/Query") if config.enable_sql_endpoint => {
2036            let input = match parse_json_body::<QueryApiRequest>(request) {
2037                Ok(payload) => payload,
2038                Err(error) => {
2039                    let payload = json!({"error": error});
2040                    return Ok((400, "application/json", payload.to_string()));
2041                }
2042            };
2043            match execute_query_api(db, config, request, path, input, false) {
2044                Ok(payload) => Ok((200, "application/json", payload.to_string())),
2045                Err(error)
2046                    if matches!(
2047                        &error,
2048                        crate::SqlRiteError::Io(io_error)
2049                            if io_error.kind() == std::io::ErrorKind::PermissionDenied
2050                    ) =>
2051                {
2052                    Ok((
2053                        403,
2054                        "application/json",
2055                        json!({"error": error.to_string()}).to_string(),
2056                    ))
2057                }
2058                Err(error) => Ok((
2059                    400,
2060                    "application/json",
2061                    json!({"error": error.to_string()}).to_string(),
2062                )),
2063            }
2064        }
2065        ("POST", _) if path.starts_with("/control/") => Ok((
2066            404,
2067            "application/json",
2068            json!({"error": "unknown control-plane endpoint"}).to_string(),
2069        )),
2070        ("GET", _) if path.starts_with("/control/") => Ok((
2071            404,
2072            "application/json",
2073            json!({"error": "unknown control-plane endpoint"}).to_string(),
2074        )),
2075        (method, "/v1/sql") if method != "POST" => Ok((
2076            405,
2077            "application/json",
2078            json!({"error": "method not allowed; use POST /v1/sql"}).to_string(),
2079        )),
2080        (method, "/v1/query") if method != "POST" => Ok((
2081            405,
2082            "application/json",
2083            json!({"error": "method not allowed; use POST /v1/query"}).to_string(),
2084        )),
2085        (method, "/v1/query-compact") if method != "POST" => Ok((
2086            405,
2087            "application/json",
2088            json!({"error": "method not allowed; use POST /v1/query-compact"}).to_string(),
2089        )),
2090        (method, "/v1/rerank-hook") if method != "POST" => Ok((
2091            405,
2092            "application/json",
2093            json!({"error": "method not allowed; use POST /v1/rerank-hook"}).to_string(),
2094        )),
2095        (method, "/grpc/sqlrite.v1.QueryService/Sql") if method != "POST" => Ok((
2096            405,
2097            "application/json",
2098            json!({"error": "method not allowed; use POST /grpc/sqlrite.v1.QueryService/Sql"})
2099                .to_string(),
2100        )),
2101        (method, "/grpc/sqlrite.v1.QueryService/Query") if method != "POST" => Ok((
2102            405,
2103            "application/json",
2104            json!({"error": "method not allowed; use POST /grpc/sqlrite.v1.QueryService/Query"})
2105                .to_string(),
2106        )),
2107        _ => Ok((404, "text/plain; charset=utf-8", "not found".to_string())),
2108    }
2109}
2110
2111fn execute_sql_api_statement(
2112    db_path: &Path,
2113    sql_profile: DurabilityProfile,
2114    config: &ServerConfig,
2115    request: &HttpRequest,
2116    input: SqlApiRequest,
2117) -> Result<(u16, &'static str, String)> {
2118    if let Err(response) = authorize_request(config, request, AccessOperation::SqlAdmin, "/v1/sql")
2119    {
2120        return Ok(response);
2121    }
2122
2123    let statement_len = input.statement.len();
2124    match execute_sdk_sql(db_path, sql_profile, input) {
2125        Ok(payload) => {
2126            audit_request(
2127                config,
2128                request,
2129                AccessOperation::SqlAdmin,
2130                true,
2131                json!({"path": request.path.as_str(), "statement_len": statement_len}),
2132            )?;
2133            Ok((200, "application/json", payload.to_string()))
2134        }
2135        Err(error) if error.is_validation() => Ok((
2136            400,
2137            "application/json",
2138            json!({"error": error.to_string()}).to_string(),
2139        )),
2140        Err(error) => Ok((
2141            500,
2142            "application/json",
2143            json!({"error": error.to_string()}).to_string(),
2144        )),
2145    }
2146}
2147
2148fn execute_query_api(
2149    db: &SqlRite,
2150    config: &ServerConfig,
2151    request: &HttpRequest,
2152    path: &str,
2153    mut input: QueryApiRequest,
2154    compact: bool,
2155) -> Result<Value> {
2156    let context = authorize_request(config, request, AccessOperation::Query, path).map_err(
2157        |(_, _, body)| {
2158            std::io::Error::new(
2159                std::io::ErrorKind::PermissionDenied,
2160                extract_error_message(&body),
2161            )
2162        },
2163    )?;
2164
2165    if let Some(context) = context {
2166        let tenant = context.tenant_id.clone();
2167        let filters = input.metadata_filters.get_or_insert_with(HashMap::new);
2168        if let Some(existing) = filters.get("tenant")
2169            && existing != &tenant
2170        {
2171            audit_request(
2172                config,
2173                request,
2174                AccessOperation::Query,
2175                false,
2176                json!({"path": path, "reason": "tenant filter mismatch"}),
2177            )?;
2178            return Err(std::io::Error::new(
2179                std::io::ErrorKind::PermissionDenied,
2180                "tenant filter mismatch",
2181            )
2182            .into());
2183        }
2184        filters.insert("tenant".to_string(), tenant);
2185    }
2186
2187    let envelope = execute_sdk_query(db, input).map_err(std::io::Error::other)?;
2188    audit_request(
2189        config,
2190        request,
2191        AccessOperation::Query,
2192        true,
2193        json!({"path": path, "row_count": envelope.row_count, "compact": compact}),
2194    )?;
2195    if compact {
2196        Ok(compact_query_envelope(envelope))
2197    } else {
2198        Ok(serde_json::to_value(envelope)?)
2199    }
2200}
2201
2202fn compact_query_envelope(envelope: sqlrite_sdk_core::QueryEnvelope<crate::SearchResult>) -> Value {
2203    let mut chunk_ids = Vec::with_capacity(envelope.rows.len());
2204    let mut hybrid_scores = Vec::with_capacity(envelope.rows.len());
2205    let mut vector_scores = Vec::with_capacity(envelope.rows.len());
2206    let mut text_scores = Vec::with_capacity(envelope.rows.len());
2207    let include_doc_ids = envelope.rows.iter().any(|row| !row.doc_id.is_empty());
2208    let include_contents = envelope.rows.iter().any(|row| !row.content.is_empty());
2209    let include_metadata = envelope.rows.iter().any(|row| !row.metadata.is_null());
2210    let mut doc_ids = if include_doc_ids {
2211        Some(Vec::with_capacity(envelope.rows.len()))
2212    } else {
2213        None
2214    };
2215    let mut contents = if include_contents {
2216        Some(Vec::with_capacity(envelope.rows.len()))
2217    } else {
2218        None
2219    };
2220    let mut metadata = if include_metadata {
2221        Some(Vec::with_capacity(envelope.rows.len()))
2222    } else {
2223        None
2224    };
2225
2226    for row in envelope.rows {
2227        chunk_ids.push(row.chunk_id);
2228        hybrid_scores.push(row.hybrid_score);
2229        vector_scores.push(row.vector_score);
2230        text_scores.push(row.text_score);
2231        if let Some(doc_ids) = &mut doc_ids {
2232            doc_ids.push(row.doc_id);
2233        }
2234        if let Some(contents) = &mut contents {
2235            contents.push(row.content);
2236        }
2237        if let Some(metadata_rows) = &mut metadata {
2238            metadata_rows.push(row.metadata);
2239        }
2240    }
2241
2242    let mut payload = serde_json::Map::new();
2243    payload.insert("kind".to_string(), json!("query_compact"));
2244    payload.insert("row_count".to_string(), json!(chunk_ids.len()));
2245    payload.insert("chunk_ids".to_string(), json!(chunk_ids));
2246    payload.insert("hybrid_scores".to_string(), json!(hybrid_scores));
2247    payload.insert("vector_scores".to_string(), json!(vector_scores));
2248    payload.insert("text_scores".to_string(), json!(text_scores));
2249    if let Some(doc_ids) = doc_ids {
2250        payload.insert("doc_ids".to_string(), json!(doc_ids));
2251    }
2252    if let Some(contents) = contents {
2253        payload.insert("contents".to_string(), json!(contents));
2254    }
2255    if let Some(metadata_rows) = metadata {
2256        payload.insert("metadata".to_string(), json!(metadata_rows));
2257    }
2258    Value::Object(payload)
2259}
2260
2261fn execute_rerank_hook_api(
2262    db: &SqlRite,
2263    config: &ServerConfig,
2264    request: &HttpRequest,
2265    path: &str,
2266    input: RerankHookRequest,
2267) -> Result<Value> {
2268    let context = authorize_request(config, request, AccessOperation::Query, path).map_err(
2269        |(_, _, body)| {
2270            std::io::Error::new(
2271                std::io::ErrorKind::PermissionDenied,
2272                extract_error_message(&body),
2273            )
2274        },
2275    )?;
2276
2277    let candidate_count = input.candidate_count.unwrap_or(25).max(1);
2278    let mut metadata_filters = input.metadata_filters.unwrap_or_default();
2279    if let Some(context) = context {
2280        if let Some(existing) = metadata_filters.get("tenant")
2281            && existing != &context.tenant_id
2282        {
2283            audit_request(
2284                config,
2285                request,
2286                AccessOperation::Query,
2287                false,
2288                json!({"path": path, "reason": "tenant filter mismatch"}),
2289            )?;
2290            return Err(std::io::Error::new(
2291                std::io::ErrorKind::PermissionDenied,
2292                "tenant filter mismatch",
2293            )
2294            .into());
2295        }
2296        metadata_filters.insert("tenant".to_string(), context.tenant_id);
2297    }
2298
2299    let request_model = crate::SearchRequest {
2300        query_text: input.query_text,
2301        query_embedding: input.query_embedding,
2302        top_k: candidate_count,
2303        alpha: input.alpha.unwrap_or(sqlrite_sdk_core::DEFAULT_ALPHA),
2304        candidate_limit: input
2305            .candidate_limit
2306            .unwrap_or(sqlrite_sdk_core::DEFAULT_CANDIDATE_LIMIT)
2307            .max(candidate_count),
2308        query_profile: parse_query_profile_api(input.query_profile.as_deref())
2309            .map_err(SqlRiteError::InvalidBenchmarkConfig)?,
2310        metadata_filters,
2311        doc_id: input.doc_id,
2312        ..crate::SearchRequest::default()
2313    };
2314    request_model
2315        .validate()
2316        .map_err(|error| SqlRiteError::InvalidBenchmarkConfig(error.to_string()))?;
2317    let rows = db.search(request_model)?;
2318    audit_request(
2319        config,
2320        request,
2321        AccessOperation::Query,
2322        true,
2323        json!({"path": path, "row_count": rows.len(), "kind": "rerank_hook"}),
2324    )?;
2325    Ok(json!({
2326        "kind": "rerank_hook",
2327        "row_count": rows.len(),
2328        "rows": rows,
2329    }))
2330}
2331
2332fn parse_query_profile_api(value: Option<&str>) -> std::result::Result<QueryProfile, String> {
2333    match value.map(str::trim).filter(|value| !value.is_empty()) {
2334        None => Ok(QueryProfile::Balanced),
2335        Some("balanced") => Ok(QueryProfile::Balanced),
2336        Some("latency") => Ok(QueryProfile::Latency),
2337        Some("recall") => Ok(QueryProfile::Recall),
2338        Some(other) => Err(format!(
2339            "invalid query_profile `{other}`; expected balanced|latency|recall"
2340        )),
2341    }
2342}
2343
2344fn openapi_query_document(sql_enabled: bool) -> Value {
2345    let mut paths = serde_json::Map::new();
2346    if sql_enabled {
2347        paths.insert(
2348            "/v1/sql".to_string(),
2349            json!({
2350                "post": {
2351                    "summary": "Execute retrieval SQL statement",
2352                    "requestBody": {
2353                        "required": true,
2354                        "content": {
2355                            "application/json": {
2356                                "schema": {"$ref": "#/components/schemas/SqlRequest"}
2357                            }
2358                        }
2359                    },
2360                    "responses": {
2361                        "200": {"description": "Statement executed"},
2362                        "400": {"description": "Invalid SQL request"}
2363                    }
2364                }
2365            }),
2366        );
2367        paths.insert(
2368            "/v1/query".to_string(),
2369            json!({
2370                "post": {
2371                    "summary": "Run semantic/lexical/hybrid retrieval query",
2372                    "requestBody": {
2373                        "required": true,
2374                        "content": {
2375                            "application/json": {
2376                                "schema": {"$ref": "#/components/schemas/QueryRequest"}
2377                            }
2378                        }
2379                    },
2380                    "responses": {
2381                        "200": {"description": "Query results"},
2382                        "400": {"description": "Invalid query request"}
2383                    }
2384                }
2385            }),
2386        );
2387        paths.insert(
2388            "/v1/query-compact".to_string(),
2389            json!({
2390                "post": {
2391                    "summary": "Run retrieval query with compact array-oriented response for agents and benchmarks",
2392                    "requestBody": {
2393                        "required": true,
2394                        "content": {
2395                            "application/json": {
2396                                "schema": {"$ref": "#/components/schemas/QueryRequest"}
2397                            }
2398                        }
2399                    },
2400                    "responses": {
2401                        "200": {"description": "Compact query results"},
2402                        "400": {"description": "Invalid query request"}
2403                    }
2404                }
2405            }),
2406        );
2407        paths.insert(
2408            "/v1/rerank-hook".to_string(),
2409            json!({
2410                "post": {
2411                    "summary": "Produce scored candidate features for external rerankers",
2412                    "requestBody": {
2413                        "required": true,
2414                        "content": {
2415                            "application/json": {
2416                                "schema": {"$ref": "#/components/schemas/RerankHookRequest"}
2417                            }
2418                        }
2419                    },
2420                    "responses": {
2421                        "200": {"description": "Rerank candidate payload"},
2422                        "400": {"description": "Invalid rerank hook request"}
2423                    }
2424                }
2425            }),
2426        );
2427        paths.insert(
2428            "/grpc/sqlrite.v1.QueryService/Sql".to_string(),
2429            json!({
2430                "post": {
2431                    "summary": "gRPC-compat SQL query method over HTTP JSON bridge",
2432                    "requestBody": {
2433                        "required": true,
2434                        "content": {
2435                            "application/json": {
2436                                "schema": {"$ref": "#/components/schemas/SqlRequest"}
2437                            }
2438                        }
2439                    },
2440                    "responses": {
2441                        "200": {"description": "Statement executed"},
2442                        "400": {"description": "Invalid SQL request"}
2443                    }
2444                }
2445            }),
2446        );
2447        paths.insert(
2448            "/grpc/sqlrite.v1.QueryService/Query".to_string(),
2449            json!({
2450                "post": {
2451                    "summary": "gRPC-compat retrieval query method over HTTP JSON bridge",
2452                    "requestBody": {
2453                        "required": true,
2454                        "content": {
2455                            "application/json": {
2456                                "schema": {"$ref": "#/components/schemas/QueryRequest"}
2457                            }
2458                        }
2459                    },
2460                    "responses": {
2461                        "200": {"description": "Query results"},
2462                        "400": {"description": "Invalid query request"}
2463                    }
2464                }
2465            }),
2466        );
2467    }
2468    paths.insert(
2469        "/v1/openapi.json".to_string(),
2470        json!({
2471            "get": {
2472                "summary": "Fetch OpenAPI document for query surfaces",
2473                "responses": {
2474                    "200": {"description": "OpenAPI document"}
2475                }
2476            }
2477        }),
2478    );
2479
2480    json!({
2481        "openapi": "3.1.0",
2482        "info": {
2483            "title": "SQLRite Query API",
2484            "version": env!("CARGO_PKG_VERSION"),
2485            "description": "OpenAPI baseline for SQL and retrieval query surfaces."
2486        },
2487        "paths": paths,
2488        "components": {
2489            "schemas": {
2490                "SqlRequest": {
2491                    "type": "object",
2492                    "required": ["statement"],
2493                    "properties": {
2494                        "statement": {"type": "string"}
2495                    }
2496                },
2497                "QueryRequest": {
2498                    "type": "object",
2499                    "properties": {
2500                        "query_text": {"type": "string"},
2501                        "query_embedding": {
2502                            "type": "array",
2503                            "items": {"type": "number"}
2504                        },
2505                        "top_k": {"type": "integer", "minimum": 1},
2506                        "alpha": {"type": "number", "minimum": 0.0, "maximum": 1.0},
2507                        "candidate_limit": {"type": "integer", "minimum": 1},
2508                        "include_payloads": {"type": "boolean"},
2509                        "query_profile": {
2510                            "type": "string",
2511                            "enum": ["balanced", "latency", "recall"]
2512                        },
2513                        "metadata_filters": {
2514                            "type": "object",
2515                            "additionalProperties": {"type": "string"}
2516                        },
2517                        "doc_id": {"type": "string"}
2518                    }
2519                },
2520                "RerankHookRequest": {
2521                    "type": "object",
2522                    "properties": {
2523                        "query_text": {"type": "string"},
2524                        "query_embedding": {
2525                            "type": "array",
2526                            "items": {"type": "number"}
2527                        },
2528                        "candidate_count": {"type": "integer", "minimum": 1},
2529                        "alpha": {"type": "number", "minimum": 0.0, "maximum": 1.0},
2530                        "candidate_limit": {"type": "integer", "minimum": 1},
2531                        "query_profile": {
2532                            "type": "string",
2533                            "enum": ["balanced", "latency", "recall"]
2534                        },
2535                        "metadata_filters": {
2536                            "type": "object",
2537                            "additionalProperties": {"type": "string"}
2538                        },
2539                        "doc_id": {"type": "string"}
2540                    }
2541                }
2542            }
2543        }
2544    })
2545}
2546
2547fn split_path_and_query(path: &str) -> (&str, HashMap<String, String>) {
2548    let mut parts = path.splitn(2, '?');
2549    let raw_path = parts.next().unwrap_or("/");
2550    let query = parts.next().unwrap_or_default();
2551    (raw_path, parse_query_params(query))
2552}
2553
2554fn parse_query_params(raw: &str) -> HashMap<String, String> {
2555    raw.split('&')
2556        .filter(|pair| !pair.is_empty())
2557        .filter_map(|pair| {
2558            let mut parts = pair.splitn(2, '=');
2559            let key = parts.next()?.trim();
2560            if key.is_empty() {
2561                return None;
2562            }
2563            let value = parts.next().unwrap_or_default().trim();
2564            Some((key.to_string(), value.to_string()))
2565        })
2566        .collect()
2567}
2568
2569fn authorize_control_request(request: &HttpRequest, config: &ServerConfig) -> bool {
2570    let Some(expected) = &config.control_api_token else {
2571        return true;
2572    };
2573
2574    request
2575        .headers
2576        .get("x-sqlrite-control-token")
2577        .is_some_and(|provided| provided == expected)
2578}
2579
2580fn unauthorized_response() -> (u16, &'static str, String) {
2581    (
2582        401,
2583        "application/json",
2584        json!({"error": "unauthorized control-plane request"}).to_string(),
2585    )
2586}
2587
2588fn authorize_request(
2589    config: &ServerConfig,
2590    request: &HttpRequest,
2591    operation: AccessOperation,
2592    path: &str,
2593) -> std::result::Result<Option<AccessContext>, (u16, &'static str, String)> {
2594    if !config.security.enabled() {
2595        return Ok(None);
2596    }
2597
2598    let Some(context) = extract_access_context(request, config) else {
2599        let response = (
2600            401,
2601            "application/json",
2602            json!({"error": "missing auth context headers"}).to_string(),
2603        );
2604        let _ = audit_request(
2605            config,
2606            request,
2607            operation,
2608            false,
2609            json!({"path": path, "reason": "missing_auth_context"}),
2610        );
2611        return Err(response);
2612    };
2613
2614    if let Some(policy) = &config.security.policy
2615        && let Err(error) = policy.authorize(&context, operation, &context.tenant_id)
2616    {
2617        let response = (
2618            403,
2619            "application/json",
2620            json!({"error": error.to_string()}).to_string(),
2621        );
2622        let _ = audit_request(
2623            config,
2624            request,
2625            operation,
2626            false,
2627            json!({"path": path, "reason": error.to_string()}),
2628        );
2629        return Err(response);
2630    }
2631
2632    Ok(Some(context))
2633}
2634
2635fn extract_access_context(request: &HttpRequest, config: &ServerConfig) -> Option<AccessContext> {
2636    let actor_id = request.headers.get("x-sqlrite-actor-id").cloned();
2637    let tenant_id = request.headers.get("x-sqlrite-tenant-id").cloned();
2638    let roles = request
2639        .headers
2640        .get("x-sqlrite-roles")
2641        .map(|value| {
2642            value
2643                .split(',')
2644                .map(str::trim)
2645                .filter(|value| !value.is_empty())
2646                .map(str::to_string)
2647                .collect::<Vec<_>>()
2648        })
2649        .unwrap_or_default();
2650
2651    match (actor_id, tenant_id) {
2652        (Some(actor_id), Some(tenant_id)) => {
2653            Some(AccessContext::new(actor_id, tenant_id).with_roles(roles))
2654        }
2655        _ if config.security.require_auth_context || config.security.enabled() => None,
2656        _ => None,
2657    }
2658}
2659
2660fn audit_request(
2661    config: &ServerConfig,
2662    request: &HttpRequest,
2663    operation: AccessOperation,
2664    allowed: bool,
2665    detail: Value,
2666) -> Result<()> {
2667    let Some(path) = &config.security.audit_log_path else {
2668        return Ok(());
2669    };
2670
2671    let context = extract_access_context(request, config)
2672        .unwrap_or_else(|| AccessContext::new("anonymous", "unknown"));
2673    let logger = JsonlAuditLogger::new(path, config.security.audit_redacted_fields.clone())?;
2674    logger.log(&AuditEvent {
2675        unix_ms: unix_ms_now(),
2676        actor_id: context.actor_id,
2677        tenant_id: context.tenant_id,
2678        operation,
2679        allowed,
2680        detail,
2681    })
2682}
2683
2684fn security_summary_json(config: &ServerConfig) -> Value {
2685    json!({
2686        "enabled": config.security.enabled(),
2687        "secure_defaults": config.security.secure_defaults,
2688        "require_auth_context": config.security.require_auth_context,
2689        "audit_log_path": config.security.audit_log_path,
2690        "rbac_roles": config.security.policy.as_ref().map(|policy| policy.role_names()).unwrap_or_default(),
2691    })
2692}
2693
2694fn extract_error_message(body: &str) -> String {
2695    serde_json::from_str::<Value>(body)
2696        .ok()
2697        .and_then(|value| {
2698            value
2699                .get("error")
2700                .and_then(Value::as_str)
2701                .map(str::to_string)
2702        })
2703        .unwrap_or_else(|| body.to_string())
2704}
2705
2706fn parse_json_body<T: for<'de> Deserialize<'de>>(
2707    request: &HttpRequest,
2708) -> std::result::Result<T, String> {
2709    if request.body.is_empty() {
2710        return Err("JSON body is required".to_string());
2711    }
2712    serde_json::from_slice::<T>(&request.body)
2713        .map_err(|error| format!("invalid JSON body: {error}"))
2714}
2715
2716fn parse_optional_json_body<T>(request: &HttpRequest) -> std::result::Result<T, String>
2717where
2718    T: for<'de> Deserialize<'de> + Default,
2719{
2720    if request.body.is_empty() {
2721        return Ok(T::default());
2722    }
2723    parse_json_body::<T>(request)
2724}
2725
2726#[derive(Debug, serde::Serialize)]
2727struct AutoFailoverEvent {
2728    promoted: bool,
2729    reason: String,
2730    term: u64,
2731    leader_id: String,
2732    failover_duration_ms: Option<u64>,
2733}
2734
2735#[derive(Debug)]
2736struct AutoFailoverEvalInput {
2737    force: bool,
2738    simulated_elapsed_ms: Option<u64>,
2739    reason: Option<String>,
2740}
2741
2742fn maybe_trigger_automatic_failover(
2743    db_path: &Path,
2744    control: &mut ControlPlaneState,
2745    input: AutoFailoverEvalInput,
2746) -> Result<Option<AutoFailoverEvent>> {
2747    if !control.profile.replication.enabled {
2748        return Ok(None);
2749    }
2750    if control.profile.replication.failover_mode != FailoverMode::Automatic {
2751        return Ok(None);
2752    }
2753    if control.runtime.role == ServerRole::Primary {
2754        return Ok(None);
2755    }
2756
2757    let heartbeat_elapsed_ms = if let Some(simulated) = input.simulated_elapsed_ms {
2758        simulated
2759    } else if let Some(last) = control.runtime.last_heartbeat_unix_ms {
2760        unix_ms_now().saturating_sub(last)
2761    } else if input.force {
2762        control
2763            .profile
2764            .replication
2765            .election_timeout_ms
2766            .saturating_add(1)
2767    } else {
2768        return Ok(None);
2769    };
2770    let timed_out = heartbeat_elapsed_ms >= control.profile.replication.election_timeout_ms;
2771    if !input.force && !timed_out {
2772        return Ok(None);
2773    }
2774
2775    let reason = input
2776        .reason
2777        .unwrap_or_else(|| "automatic_failover_timeout".to_string());
2778    control.runtime.mark_failover_started();
2779    control.resilience.start_failover();
2780    persist_resilience_event(db_path, "failover_started", None, &reason)?;
2781
2782    let node_id = control.profile.replication.node_id.clone();
2783    control.profile.replication.role = ServerRole::Primary;
2784    control.runtime.promote_to_primary(node_id.clone());
2785    let duration = control.resilience.complete_failover();
2786    if let Some(duration_ms) = duration {
2787        persist_resilience_event(
2788            db_path,
2789            "failover_completed",
2790            Some(duration_ms),
2791            "automatic_failover_promote",
2792        )?;
2793    }
2794    persist_runtime_marker(
2795        db_path,
2796        "last_role_transition",
2797        &serde_json::to_string(&control.runtime)?,
2798    )?;
2799
2800    let entry = control
2801        .replication_log
2802        .append_leader_event(
2803            control.runtime.current_term.max(1),
2804            &node_id,
2805            "automatic_failover_promote".to_string(),
2806            json!({
2807                "reason": reason,
2808                "heartbeat_elapsed_ms": heartbeat_elapsed_ms,
2809                "election_timeout_ms": control.profile.replication.election_timeout_ms,
2810            }),
2811            &node_id,
2812        )
2813        .map_err(std::io::Error::other)?;
2814    let last_index = control.replication_log.last_index();
2815    let last_term = control.replication_log.last_term();
2816    control.runtime.note_log_position(last_index, last_term);
2817    let new_commit = control.replication_log.compute_commit_index(
2818        control.runtime.commit_index,
2819        control.profile.replication.sync_ack_quorum,
2820    );
2821    control.runtime.advance_commit_index(new_commit);
2822    append_replication_entry_to_store(
2823        db_path,
2824        &entry,
2825        entry.index <= control.runtime.commit_index,
2826    )?;
2827    if new_commit > 0 {
2828        mark_committed_replication_entries(db_path, new_commit)?;
2829    }
2830
2831    Ok(Some(AutoFailoverEvent {
2832        promoted: true,
2833        reason,
2834        term: control.runtime.current_term,
2835        leader_id: node_id,
2836        failover_duration_ms: duration,
2837    }))
2838}
2839
2840fn chaos_blocking_response(
2841    control: &mut ControlPlaneState,
2842    request: &HttpRequest,
2843    path: &str,
2844) -> Option<(u16, &'static str, String)> {
2845    if control.chaos.has(ChaosScenario::NodeCrash) {
2846        control.resilience.chaos_blocked_requests_total = control
2847            .resilience
2848            .chaos_blocked_requests_total
2849            .saturating_add(1);
2850        return Some((
2851            503,
2852            "application/json",
2853            json!({
2854                "error": "chaos fault active: node_crash",
2855                "path": path,
2856            })
2857            .to_string(),
2858        ));
2859    }
2860
2861    if control.chaos.has(ChaosScenario::DiskFull) {
2862        let blocks = path == "/control/v1/replication/append"
2863            || path == "/v1/sql"
2864            || path == "/grpc/sqlrite.v1.QueryService/Sql"
2865            || path == "/control/v1/recovery/start"
2866            || path == "/control/v1/recovery/mark-restored"
2867            || path == "/control/v1/recovery/snapshot"
2868            || path == "/control/v1/recovery/verify-restore"
2869            || path == "/control/v1/recovery/prune-snapshots";
2870        if blocks && request.method == "POST" {
2871            control.resilience.chaos_blocked_requests_total = control
2872                .resilience
2873                .chaos_blocked_requests_total
2874                .saturating_add(1);
2875            return Some((
2876                507,
2877                "application/json",
2878                json!({
2879                    "error": "chaos fault active: disk_full",
2880                    "path": path,
2881                })
2882                .to_string(),
2883            ));
2884        }
2885    }
2886
2887    if control.chaos.has(ChaosScenario::PartitionSubset)
2888        && matches!(
2889            path,
2890            "/control/v1/election/heartbeat" | "/control/v1/replication/receive"
2891        )
2892    {
2893        control.resilience.chaos_blocked_requests_total = control
2894            .resilience
2895            .chaos_blocked_requests_total
2896            .saturating_add(1);
2897        return Some((
2898            503,
2899            "application/json",
2900            json!({
2901                "error": "chaos fault active: partition_subset",
2902                "path": path,
2903            })
2904            .to_string(),
2905        ));
2906    }
2907
2908    None
2909}
2910
2911fn restore_replication_state(db_path: &Path, control: &mut ControlPlaneState) -> Result<()> {
2912    let conn = Connection::open(db_path)?;
2913    ensure_replication_catalog(&conn)?;
2914
2915    let mut stmt = conn.prepare(
2916        "
2917        SELECT idx, term, leader_id, operation, payload_json, checksum, created_at_unix_ms, committed
2918        FROM replication_log
2919        ORDER BY idx ASC
2920        ",
2921    )?;
2922    let mut rows = stmt.query([])?;
2923    let mut entries = Vec::new();
2924    let mut commit_index = 0u64;
2925
2926    while let Some(row) = rows.next()? {
2927        let index: u64 = row.get(0)?;
2928        let term: u64 = row.get(1)?;
2929        let leader_id: String = row.get(2)?;
2930        let operation: String = row.get(3)?;
2931        let payload_json: String = row.get(4)?;
2932        let checksum: String = row.get(5)?;
2933        let created_at_unix_ms: u64 = row.get(6)?;
2934        let committed: i64 = row.get(7)?;
2935        let payload = serde_json::from_str::<Value>(&payload_json)?;
2936        entries.push(ReplicationLogEntry {
2937            index,
2938            term,
2939            leader_id,
2940            operation,
2941            payload,
2942            checksum,
2943            created_at_unix_ms,
2944        });
2945        if committed == 1 {
2946            commit_index = index;
2947        }
2948    }
2949
2950    control.replication_log =
2951        ReplicationLog::from_entries(entries).map_err(std::io::Error::other)?;
2952    control.runtime.note_log_position(
2953        control.replication_log.last_index(),
2954        control.replication_log.last_term(),
2955    );
2956    control.runtime.advance_commit_index(commit_index);
2957    control.runtime.current_term = control
2958        .runtime
2959        .current_term
2960        .max(control.runtime.last_log_term);
2961
2962    let voted_for: Option<String> = conn
2963        .query_row(
2964            "
2965            SELECT candidate_id
2966            FROM election_votes
2967            WHERE term = ?1 AND voter_id = ?2 AND granted = 1
2968            ORDER BY rowid DESC
2969            LIMIT 1
2970            ",
2971            params![
2972                control.runtime.current_term,
2973                control.profile.replication.node_id.as_str()
2974            ],
2975            |row| row.get(0),
2976        )
2977        .optional()?;
2978    control.runtime.voted_for = voted_for;
2979
2980    control.resilience.failover_events_total = conn.query_row(
2981        "SELECT COUNT(*) FROM ha_resilience_events WHERE event_type = 'failover_started'",
2982        [],
2983        |row| row.get(0),
2984    )?;
2985    control.resilience.failover_completed_total = conn.query_row(
2986        "SELECT COUNT(*) FROM ha_resilience_events WHERE event_type = 'failover_completed'",
2987        [],
2988        |row| row.get(0),
2989    )?;
2990    control.resilience.cumulative_failover_duration_ms = conn
2991        .query_row(
2992            "SELECT COALESCE(SUM(duration_ms), 0) FROM ha_resilience_events WHERE event_type = 'failover_completed'",
2993            [],
2994            |row| row.get::<_, i64>(0),
2995        )?
2996        .max(0) as u128;
2997    control.resilience.last_failover_duration_ms = conn
2998        .query_row(
2999            "SELECT duration_ms FROM ha_resilience_events WHERE event_type = 'failover_completed' ORDER BY rowid DESC LIMIT 1",
3000            [],
3001            |row| row.get::<_, i64>(0),
3002        )
3003        .optional()?
3004        .map(|value| value.max(0) as u64);
3005
3006    control.resilience.restore_events_total = conn.query_row(
3007        "SELECT COUNT(*) FROM ha_resilience_events WHERE event_type = 'restore_started'",
3008        [],
3009        |row| row.get(0),
3010    )?;
3011    control.resilience.restore_completed_total = conn.query_row(
3012        "SELECT COUNT(*) FROM ha_resilience_events WHERE event_type = 'restore_completed'",
3013        [],
3014        |row| row.get(0),
3015    )?;
3016    control.resilience.cumulative_restore_duration_ms = conn
3017        .query_row(
3018            "SELECT COALESCE(SUM(duration_ms), 0) FROM ha_resilience_events WHERE event_type = 'restore_completed'",
3019            [],
3020            |row| row.get::<_, i64>(0),
3021        )?
3022        .max(0) as u128;
3023    control.resilience.last_restore_duration_ms = conn
3024        .query_row(
3025            "SELECT duration_ms FROM ha_resilience_events WHERE event_type = 'restore_completed' ORDER BY rowid DESC LIMIT 1",
3026            [],
3027            |row| row.get::<_, i64>(0),
3028        )
3029        .optional()?
3030        .map(|value| value.max(0) as u64);
3031
3032    control.resilience.chaos_injections_total = conn.query_row(
3033        "SELECT COUNT(*) FROM ha_chaos_events WHERE action = 'inject'",
3034        [],
3035        |row| row.get(0),
3036    )?;
3037
3038    Ok(())
3039}
3040
3041fn ensure_replication_catalog(conn: &Connection) -> Result<()> {
3042    conn.execute_batch(
3043        "
3044        CREATE TABLE IF NOT EXISTS replication_log (
3045            idx INTEGER PRIMARY KEY,
3046            term INTEGER NOT NULL,
3047            leader_id TEXT NOT NULL,
3048            operation TEXT NOT NULL,
3049            payload_json TEXT NOT NULL,
3050            checksum TEXT NOT NULL,
3051            created_at_unix_ms INTEGER NOT NULL,
3052            committed INTEGER NOT NULL DEFAULT 0
3053        );
3054
3055        CREATE TABLE IF NOT EXISTS election_votes (
3056            term INTEGER NOT NULL,
3057            voter_id TEXT NOT NULL,
3058            candidate_id TEXT NOT NULL,
3059            granted INTEGER NOT NULL,
3060            reason TEXT NOT NULL,
3061            created_at TEXT NOT NULL DEFAULT (datetime('now'))
3062        );
3063
3064        CREATE TABLE IF NOT EXISTS replication_reconcile_events (
3065            node_id TEXT NOT NULL,
3066            last_applied_index INTEGER NOT NULL,
3067            commit_index INTEGER NOT NULL,
3068            replication_lag_ms INTEGER NOT NULL,
3069            created_at TEXT NOT NULL DEFAULT (datetime('now'))
3070        );
3071
3072        CREATE TABLE IF NOT EXISTS ha_runtime_markers (
3073            key TEXT PRIMARY KEY,
3074            value TEXT NOT NULL,
3075            updated_at TEXT NOT NULL DEFAULT (datetime('now'))
3076        );
3077
3078        CREATE TABLE IF NOT EXISTS ha_resilience_events (
3079            event_type TEXT NOT NULL,
3080            duration_ms INTEGER,
3081            note TEXT,
3082            created_at_unix_ms INTEGER NOT NULL
3083        );
3084
3085        CREATE TABLE IF NOT EXISTS ha_chaos_events (
3086            action TEXT NOT NULL,
3087            scenario TEXT NOT NULL,
3088            created_at_unix_ms INTEGER NOT NULL
3089        );
3090        ",
3091    )?;
3092    Ok(())
3093}
3094
3095fn append_replication_entry_to_store(
3096    db_path: &Path,
3097    entry: &ReplicationLogEntry,
3098    committed: bool,
3099) -> Result<()> {
3100    let conn = Connection::open(db_path)?;
3101    ensure_replication_catalog(&conn)?;
3102    conn.execute(
3103        "
3104        INSERT OR REPLACE INTO replication_log
3105            (idx, term, leader_id, operation, payload_json, checksum, created_at_unix_ms, committed)
3106        VALUES
3107            (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
3108        ",
3109        params![
3110            entry.index,
3111            entry.term,
3112            entry.leader_id,
3113            entry.operation,
3114            serde_json::to_string(&entry.payload)?,
3115            entry.checksum,
3116            entry.created_at_unix_ms,
3117            if committed { 1 } else { 0 },
3118        ],
3119    )?;
3120    Ok(())
3121}
3122
3123fn rewrite_replication_log_store(
3124    db_path: &Path,
3125    log: &ReplicationLog,
3126    commit_index: u64,
3127) -> Result<()> {
3128    let mut conn = Connection::open(db_path)?;
3129    ensure_replication_catalog(&conn)?;
3130    let tx = conn.transaction()?;
3131    tx.execute("DELETE FROM replication_log", [])?;
3132
3133    for entry in log.entries() {
3134        tx.execute(
3135            "
3136            INSERT INTO replication_log
3137                (idx, term, leader_id, operation, payload_json, checksum, created_at_unix_ms, committed)
3138            VALUES
3139                (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
3140            ",
3141            params![
3142                entry.index,
3143                entry.term,
3144                entry.leader_id,
3145                entry.operation,
3146                serde_json::to_string(&entry.payload)?,
3147                entry.checksum,
3148                entry.created_at_unix_ms,
3149                if entry.index <= commit_index { 1 } else { 0 },
3150            ],
3151        )?;
3152    }
3153
3154    tx.commit()?;
3155    Ok(())
3156}
3157
3158fn mark_committed_replication_entries(db_path: &Path, commit_index: u64) -> Result<()> {
3159    let conn = Connection::open(db_path)?;
3160    ensure_replication_catalog(&conn)?;
3161    conn.execute(
3162        "UPDATE replication_log SET committed = 1 WHERE idx <= ?1",
3163        params![commit_index],
3164    )?;
3165    Ok(())
3166}
3167
3168fn persist_vote_event(
3169    db_path: &Path,
3170    term: u64,
3171    voter_id: &str,
3172    candidate_id: &str,
3173    granted: bool,
3174    reason: &str,
3175) -> Result<()> {
3176    let conn = Connection::open(db_path)?;
3177    ensure_replication_catalog(&conn)?;
3178    conn.execute(
3179        "
3180        INSERT INTO election_votes (term, voter_id, candidate_id, granted, reason)
3181        VALUES (?1, ?2, ?3, ?4, ?5)
3182        ",
3183        params![
3184            term,
3185            voter_id,
3186            candidate_id,
3187            if granted { 1 } else { 0 },
3188            reason
3189        ],
3190    )?;
3191    Ok(())
3192}
3193
3194fn persist_reconcile_event(
3195    db_path: &Path,
3196    node_id: &str,
3197    last_applied_index: u64,
3198    commit_index: u64,
3199    replication_lag_ms: u64,
3200) -> Result<()> {
3201    let conn = Connection::open(db_path)?;
3202    ensure_replication_catalog(&conn)?;
3203    conn.execute(
3204        "
3205        INSERT INTO replication_reconcile_events (node_id, last_applied_index, commit_index, replication_lag_ms)
3206        VALUES (?1, ?2, ?3, ?4)
3207        ",
3208        params![node_id, last_applied_index, commit_index, replication_lag_ms],
3209    )?;
3210    Ok(())
3211}
3212
3213fn persist_runtime_marker(db_path: &Path, key: &str, value: &str) -> Result<()> {
3214    let conn = Connection::open(db_path)?;
3215    ensure_replication_catalog(&conn)?;
3216    conn.execute(
3217        "
3218        INSERT INTO ha_runtime_markers (key, value, updated_at)
3219        VALUES (?1, ?2, datetime('now'))
3220        ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = datetime('now')
3221        ",
3222        params![key, value],
3223    )?;
3224    Ok(())
3225}
3226
3227fn persist_resilience_event(
3228    db_path: &Path,
3229    event_type: &str,
3230    duration_ms: Option<u64>,
3231    note: &str,
3232) -> Result<()> {
3233    let conn = Connection::open(db_path)?;
3234    ensure_replication_catalog(&conn)?;
3235    conn.execute(
3236        "
3237        INSERT INTO ha_resilience_events (event_type, duration_ms, note, created_at_unix_ms)
3238        VALUES (?1, ?2, ?3, ?4)
3239        ",
3240        params![
3241            event_type,
3242            duration_ms.map(|value| value as i64),
3243            note,
3244            unix_ms_now() as i64,
3245        ],
3246    )?;
3247    Ok(())
3248}
3249
3250fn persist_chaos_event(db_path: &Path, action: &str, scenario: &str) -> Result<()> {
3251    let conn = Connection::open(db_path)?;
3252    ensure_replication_catalog(&conn)?;
3253    conn.execute(
3254        "
3255        INSERT INTO ha_chaos_events (action, scenario, created_at_unix_ms)
3256        VALUES (?1, ?2, ?3)
3257        ",
3258        params![action, scenario, unix_ms_now() as i64],
3259    )?;
3260    Ok(())
3261}
3262
3263fn unix_ms_now() -> u64 {
3264    SystemTime::now()
3265        .duration_since(UNIX_EPOCH)
3266        .unwrap_or_default()
3267        .as_millis() as u64
3268}
3269
3270fn write_response(
3271    stream: &mut TcpStream,
3272    status: u16,
3273    content_type: &str,
3274    body: &str,
3275) -> std::io::Result<()> {
3276    let status_text = match status {
3277        200 => "OK",
3278        400 => "Bad Request",
3279        401 => "Unauthorized",
3280        404 => "Not Found",
3281        405 => "Method Not Allowed",
3282        409 => "Conflict",
3283        507 => "Insufficient Storage",
3284        500 => "Internal Server Error",
3285        503 => "Service Unavailable",
3286        _ => "OK",
3287    };
3288
3289    let header = format!(
3290        "HTTP/1.1 {status} {status_text}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
3291        body.len()
3292    );
3293    stream.write_all(header.as_bytes())?;
3294    stream.write_all(body.as_bytes())?;
3295    stream.flush()?;
3296    Ok(())
3297}
3298
3299#[cfg(test)]
3300mod tests {
3301    use super::*;
3302    use crate::security::AuditLogger;
3303    use crate::{ChunkInput, RbacPolicy, RuntimeConfig};
3304    use serde_json::json;
3305    use tempfile::{NamedTempFile, tempdir};
3306
3307    fn make_request(
3308        method: &str,
3309        path: &str,
3310        body: Option<&str>,
3311        token: Option<&str>,
3312    ) -> HttpRequest {
3313        let mut headers = HashMap::new();
3314        let body_bytes = body.unwrap_or_default().as_bytes().to_vec();
3315        if !body_bytes.is_empty() {
3316            headers.insert("content-length".to_string(), body_bytes.len().to_string());
3317            headers.insert("content-type".to_string(), "application/json".to_string());
3318        }
3319        if let Some(value) = token {
3320            headers.insert("x-sqlrite-control-token".to_string(), value.to_string());
3321        }
3322
3323        HttpRequest {
3324            method: method.to_string(),
3325            path: path.to_string(),
3326            headers,
3327            body: body_bytes,
3328        }
3329    }
3330
3331    fn replication_primary_config() -> ServerConfig {
3332        let mut cfg = ServerConfig::default();
3333        cfg.ha_profile.replication.enabled = true;
3334        cfg.ha_profile.replication.role = ServerRole::Primary;
3335        cfg.ha_profile.replication.sync_ack_quorum = 2;
3336        cfg.ha_profile.replication.node_id = "node-a".to_string();
3337        cfg
3338    }
3339
3340    fn secure_server_config(audit_log_path: PathBuf) -> ServerConfig {
3341        ServerConfig {
3342            security: ServerSecurityConfig {
3343                secure_defaults: true,
3344                require_auth_context: true,
3345                policy: Some(RbacPolicy::default()),
3346                audit_log_path: Some(audit_log_path),
3347                ..ServerSecurityConfig::default()
3348            },
3349            ..ServerConfig::default()
3350        }
3351    }
3352
3353    #[test]
3354    fn parses_http_request_line() {
3355        assert_eq!(
3356            parse_http_request_line("GET /healthz HTTP/1.1\r\n"),
3357            Some(("GET", "/healthz"))
3358        );
3359        assert_eq!(parse_http_request_line(""), None);
3360    }
3361
3362    #[test]
3363    fn builds_health_response_with_ha_payload() -> Result<()> {
3364        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3365        db.ingest_chunk(&ChunkInput {
3366            id: "c1".to_string(),
3367            doc_id: "d1".to_string(),
3368            content: "health endpoint".to_string(),
3369            embedding: vec![1.0, 0.0],
3370            metadata: json!({"tenant": "acme"}),
3371            source: None,
3372        })?;
3373
3374        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3375            HaRuntimeProfile::default(),
3376        )));
3377        let request = make_request("GET", "/healthz", None, None);
3378        let (status, content_type, body) = build_response(
3379            &db,
3380            Path::new(":memory:"),
3381            DurabilityProfile::Balanced,
3382            &ServerConfig::default(),
3383            &state,
3384            &request,
3385        )?;
3386        assert_eq!(status, 200);
3387        assert_eq!(content_type, "application/json");
3388        assert!(body.contains("storage"));
3389        assert!(body.contains("ha"));
3390        Ok(())
3391    }
3392
3393    #[test]
3394    fn control_plane_requires_token_when_configured() -> Result<()> {
3395        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3396        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3397            HaRuntimeProfile::default(),
3398        )));
3399        let request = make_request("POST", "/control/v1/failover/start", Some("{}"), None);
3400        let config = ServerConfig {
3401            control_api_token: Some("secret".to_string()),
3402            ..ServerConfig::default()
3403        };
3404
3405        let (status, _content_type, body) = build_response(
3406            &db,
3407            Path::new(":memory:"),
3408            DurabilityProfile::Balanced,
3409            &config,
3410            &state,
3411            &request,
3412        )?;
3413        assert_eq!(status, 401);
3414        assert!(body.contains("unauthorized"));
3415        Ok(())
3416    }
3417
3418    #[test]
3419    fn sql_endpoint_rejects_non_post() -> Result<()> {
3420        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3421        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3422            HaRuntimeProfile::default(),
3423        )));
3424        let request = make_request("GET", "/v1/sql", None, None);
3425
3426        let (status, _content_type, body) = build_response(
3427            &db,
3428            Path::new(":memory:"),
3429            DurabilityProfile::Balanced,
3430            &ServerConfig::default(),
3431            &state,
3432            &request,
3433        )?;
3434        assert_eq!(status, 405);
3435        assert!(body.contains("method not allowed"));
3436        Ok(())
3437    }
3438
3439    #[test]
3440    fn sql_endpoint_reports_parse_error() -> Result<()> {
3441        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
3442        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
3443        db.ingest_chunk(&ChunkInput {
3444            id: "c1".to_string(),
3445            doc_id: "d1".to_string(),
3446            content: "agent memory".to_string(),
3447            embedding: vec![1.0, 0.0],
3448            metadata: json!({}),
3449            source: None,
3450        })?;
3451
3452        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3453            HaRuntimeProfile::default(),
3454        )));
3455        let request = make_request("POST", "/v1/sql", Some("{}"), None);
3456
3457        let (status, _content_type, body) = build_response(
3458            &db,
3459            db_file.path(),
3460            DurabilityProfile::Balanced,
3461            &ServerConfig::default(),
3462            &state,
3463            &request,
3464        )?;
3465        assert_eq!(status, 400);
3466        assert!(body.contains("statement"));
3467        Ok(())
3468    }
3469
3470    #[test]
3471    fn sql_endpoint_executes_search_statement() -> Result<()> {
3472        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
3473        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
3474        db.ingest_chunk(&ChunkInput {
3475            id: "search-sql-1".to_string(),
3476            doc_id: "doc-1".to_string(),
3477            content: "server sql search endpoint".to_string(),
3478            embedding: vec![1.0, 0.0],
3479            metadata: json!({"tenant": "demo"}),
3480            source: Some("docs/search-sql-1.md".to_string()),
3481        })?;
3482
3483        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3484            HaRuntimeProfile::default(),
3485        )));
3486        let body = json!({
3487            "statement": "SELECT chunk_id, doc_id, hybrid_score FROM SEARCH('server sql', vector('1,0'), 3, 0.65, 500, 'balanced', NULL, NULL) ORDER BY hybrid_score DESC, chunk_id ASC;"
3488        })
3489        .to_string();
3490        let request = make_request("POST", "/v1/sql", Some(&body), None);
3491
3492        let (status, _content_type, body) = build_response(
3493            &db,
3494            db_file.path(),
3495            DurabilityProfile::Balanced,
3496            &ServerConfig::default(),
3497            &state,
3498            &request,
3499        )?;
3500        assert_eq!(status, 200, "{body}");
3501        assert!(body.contains("\"chunk_id\":\"search-sql-1\""));
3502        Ok(())
3503    }
3504
3505    #[test]
3506    fn openapi_endpoint_exposes_query_and_grpc_paths() -> Result<()> {
3507        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3508        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3509            HaRuntimeProfile::default(),
3510        )));
3511        let request = make_request("GET", "/v1/openapi.json", None, None);
3512
3513        let (status, _content_type, body) = build_response(
3514            &db,
3515            Path::new(":memory:"),
3516            DurabilityProfile::Balanced,
3517            &ServerConfig::default(),
3518            &state,
3519            &request,
3520        )?;
3521        assert_eq!(status, 200);
3522        assert!(body.contains("\"/v1/query\""));
3523        assert!(body.contains("\"/v1/query-compact\""));
3524        assert!(body.contains("\"/grpc/sqlrite.v1.QueryService/Query\""));
3525        assert!(body.contains("\"/grpc/sqlrite.v1.QueryService/Sql\""));
3526        Ok(())
3527    }
3528
3529    #[test]
3530    fn openapi_endpoint_hides_sql_paths_when_sql_endpoint_disabled() -> Result<()> {
3531        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3532        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3533            HaRuntimeProfile::default(),
3534        )));
3535        let request = make_request("GET", "/v1/openapi.json", None, None);
3536        let config = ServerConfig {
3537            enable_sql_endpoint: false,
3538            ..ServerConfig::default()
3539        };
3540
3541        let (status, _content_type, body) = build_response(
3542            &db,
3543            Path::new(":memory:"),
3544            DurabilityProfile::Balanced,
3545            &config,
3546            &state,
3547            &request,
3548        )?;
3549        assert_eq!(status, 200);
3550        assert!(!body.contains("\"/v1/sql\""));
3551        assert!(!body.contains("\"/v1/query\""));
3552        assert!(!body.contains("\"/grpc/sqlrite.v1.QueryService/Sql\""));
3553        assert!(!body.contains("\"/grpc/sqlrite.v1.QueryService/Query\""));
3554        assert!(body.contains("\"/v1/openapi.json\""));
3555        Ok(())
3556    }
3557
3558    #[test]
3559    fn security_endpoint_reports_secure_defaults_and_roles() -> Result<()> {
3560        let tmp = tempdir()?;
3561        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3562        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3563            HaRuntimeProfile::default(),
3564        )));
3565        let config = secure_server_config(tmp.path().join("audit.jsonl"));
3566        let request = make_request("GET", "/control/v1/security", None, None);
3567
3568        let (status, _, body) = build_response(
3569            &db,
3570            Path::new(":memory:"),
3571            DurabilityProfile::Balanced,
3572            &config,
3573            &state,
3574            &request,
3575        )?;
3576        assert_eq!(status, 200);
3577        assert!(body.contains("\"enabled\":true"));
3578        assert!(body.contains("\"require_auth_context\":true"));
3579        assert!(body.contains("tenant_admin"));
3580        Ok(())
3581    }
3582
3583    #[test]
3584    fn security_audit_export_endpoint_writes_filtered_jsonl() -> Result<()> {
3585        let tmp = tempdir()?;
3586        let audit_path = tmp.path().join("audit.jsonl");
3587        let export_path = tmp.path().join("export.jsonl");
3588        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3589        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3590            HaRuntimeProfile::default(),
3591        )));
3592        let mut config = secure_server_config(audit_path.clone());
3593        config.control_api_token = Some("secret".to_string());
3594
3595        let logger = JsonlAuditLogger::new(&audit_path, Vec::<String>::new())?;
3596        logger.log(&AuditEvent {
3597            unix_ms: 10,
3598            actor_id: "reader-1".to_string(),
3599            tenant_id: "acme".to_string(),
3600            operation: AccessOperation::Query,
3601            allowed: true,
3602            detail: json!({"path":"/v1/query"}),
3603        })?;
3604        logger.log(&AuditEvent {
3605            unix_ms: 20,
3606            actor_id: "admin-1".to_string(),
3607            tenant_id: "acme".to_string(),
3608            operation: AccessOperation::SqlAdmin,
3609            allowed: false,
3610            detail: json!({"path":"/v1/sql"}),
3611        })?;
3612
3613        let request = make_request(
3614            "POST",
3615            "/control/v1/security/audit/export",
3616            Some(&format!(
3617                "{{\"actor_id\":\"reader-1\",\"output_path\":\"{}\",\"format\":\"jsonl\"}}",
3618                export_path.display()
3619            )),
3620            Some("secret"),
3621        );
3622        let (status, _, body) = build_response(
3623            &db,
3624            Path::new(":memory:"),
3625            DurabilityProfile::Balanced,
3626            &config,
3627            &state,
3628            &request,
3629        )?;
3630        assert_eq!(status, 200);
3631        assert!(body.contains("\"matched_events\":1"));
3632        let export = std::fs::read_to_string(export_path)?;
3633        assert!(export.contains("reader-1"));
3634        assert!(!export.contains("admin-1"));
3635        Ok(())
3636    }
3637
3638    #[test]
3639    fn secure_query_requires_auth_context_headers() -> Result<()> {
3640        let tmp = tempdir()?;
3641        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3642        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3643            HaRuntimeProfile::default(),
3644        )));
3645        let config = secure_server_config(tmp.path().join("audit.jsonl"));
3646        let request = make_request(
3647            "POST",
3648            "/v1/query",
3649            Some(r#"{"query_text":"agent","top_k":1}"#),
3650            None,
3651        );
3652
3653        let (status, _, body) = build_response(
3654            &db,
3655            Path::new(":memory:"),
3656            DurabilityProfile::Balanced,
3657            &config,
3658            &state,
3659            &request,
3660        )?;
3661        assert_eq!(status, 403);
3662        assert!(body.contains("missing auth context"));
3663        Ok(())
3664    }
3665
3666    #[test]
3667    fn secure_query_enforces_tenant_headers_and_audits() -> Result<()> {
3668        let tmp = tempdir()?;
3669        let audit_path = tmp.path().join("audit.jsonl");
3670        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3671        db.ingest_chunk(&ChunkInput {
3672            id: "tenant-1".to_string(),
3673            doc_id: "doc-1".to_string(),
3674            content: "tenant scoped agent memory".to_string(),
3675            embedding: vec![1.0, 0.0],
3676            metadata: json!({"tenant": "acme"}),
3677            source: None,
3678        })?;
3679        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3680            HaRuntimeProfile::default(),
3681        )));
3682        let config = secure_server_config(audit_path.clone());
3683
3684        let mut ok_request = make_request(
3685            "POST",
3686            "/v1/query",
3687            Some(r#"{"query_text":"agent","top_k":1}"#),
3688            None,
3689        );
3690        ok_request
3691            .headers
3692            .insert("x-sqlrite-actor-id".to_string(), "reader-1".to_string());
3693        ok_request
3694            .headers
3695            .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3696        ok_request
3697            .headers
3698            .insert("x-sqlrite-roles".to_string(), "reader".to_string());
3699
3700        let (status, _, body) = build_response(
3701            &db,
3702            Path::new(":memory:"),
3703            DurabilityProfile::Balanced,
3704            &config,
3705            &state,
3706            &ok_request,
3707        )?;
3708        assert_eq!(status, 200);
3709        assert!(body.contains("\"row_count\":1"));
3710
3711        let mut denied_request = make_request(
3712            "POST",
3713            "/v1/query",
3714            Some(r#"{"query_text":"agent","top_k":1,"metadata_filters":{"tenant":"beta"}}"#),
3715            None,
3716        );
3717        denied_request
3718            .headers
3719            .insert("x-sqlrite-actor-id".to_string(), "reader-1".to_string());
3720        denied_request
3721            .headers
3722            .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3723        denied_request
3724            .headers
3725            .insert("x-sqlrite-roles".to_string(), "reader".to_string());
3726
3727        let (status, _, body) = build_response(
3728            &db,
3729            Path::new(":memory:"),
3730            DurabilityProfile::Balanced,
3731            &config,
3732            &state,
3733            &denied_request,
3734        )?;
3735        assert_eq!(status, 403);
3736        assert!(body.contains("tenant filter mismatch"));
3737
3738        let audit = std::fs::read_to_string(audit_path)?;
3739        assert!(audit.contains("\"allowed\":true"));
3740        assert!(audit.contains("\"allowed\":false"));
3741        Ok(())
3742    }
3743
3744    #[test]
3745    fn rerank_hook_endpoint_returns_scored_candidates() -> Result<()> {
3746        let tmp = tempdir()?;
3747        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3748        db.ingest_chunk(&ChunkInput {
3749            id: "r1".to_string(),
3750            doc_id: "doc-1".to_string(),
3751            content: "rerank candidate agent memory".to_string(),
3752            embedding: vec![1.0, 0.0],
3753            metadata: json!({"tenant": "acme"}),
3754            source: None,
3755        })?;
3756        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3757            HaRuntimeProfile::default(),
3758        )));
3759        let config = secure_server_config(tmp.path().join("audit.jsonl"));
3760
3761        let mut request = make_request(
3762            "POST",
3763            "/v1/rerank-hook",
3764            Some(r#"{"query_text":"agent","candidate_count":5}"#),
3765            None,
3766        );
3767        request
3768            .headers
3769            .insert("x-sqlrite-actor-id".to_string(), "reader-1".to_string());
3770        request
3771            .headers
3772            .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3773        request
3774            .headers
3775            .insert("x-sqlrite-roles".to_string(), "reader".to_string());
3776
3777        let (status, _, body) = build_response(
3778            &db,
3779            Path::new(":memory:"),
3780            DurabilityProfile::Balanced,
3781            &config,
3782            &state,
3783            &request,
3784        )?;
3785        assert_eq!(status, 200);
3786        assert!(body.contains("\"kind\":\"rerank_hook\""));
3787        assert!(body.contains("\"vector_score\""));
3788        assert!(body.contains("\"text_score\""));
3789        Ok(())
3790    }
3791
3792    #[test]
3793    fn secure_sql_requires_admin_role() -> Result<()> {
3794        let tmp = tempdir()?;
3795        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
3796        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
3797        db.ingest_chunk(&ChunkInput {
3798            id: "sql-1".to_string(),
3799            doc_id: "doc-1".to_string(),
3800            content: "sql secure query".to_string(),
3801            embedding: vec![1.0, 0.0],
3802            metadata: json!({"tenant": "acme"}),
3803            source: None,
3804        })?;
3805        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3806            HaRuntimeProfile::default(),
3807        )));
3808        let config = secure_server_config(tmp.path().join("audit.jsonl"));
3809
3810        let mut reader_request = make_request(
3811            "POST",
3812            "/v1/sql",
3813            Some(r#"{"statement":"SELECT id FROM chunks ORDER BY id ASC LIMIT 1;"}"#),
3814            None,
3815        );
3816        reader_request
3817            .headers
3818            .insert("x-sqlrite-actor-id".to_string(), "reader-1".to_string());
3819        reader_request
3820            .headers
3821            .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3822        reader_request
3823            .headers
3824            .insert("x-sqlrite-roles".to_string(), "reader".to_string());
3825
3826        let (status, _, body) = build_response(
3827            &db,
3828            db_file.path(),
3829            DurabilityProfile::Balanced,
3830            &config,
3831            &state,
3832            &reader_request,
3833        )?;
3834        assert_eq!(status, 403);
3835        assert!(body.contains("authorization denied"));
3836
3837        let mut admin_request = make_request(
3838            "POST",
3839            "/v1/sql",
3840            Some(r#"{"statement":"SELECT id FROM chunks ORDER BY id ASC LIMIT 1;"}"#),
3841            None,
3842        );
3843        admin_request
3844            .headers
3845            .insert("x-sqlrite-actor-id".to_string(), "admin-1".to_string());
3846        admin_request
3847            .headers
3848            .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3849        admin_request
3850            .headers
3851            .insert("x-sqlrite-roles".to_string(), "admin".to_string());
3852
3853        let (status, _, body) = build_response(
3854            &db,
3855            db_file.path(),
3856            DurabilityProfile::Balanced,
3857            &config,
3858            &state,
3859            &admin_request,
3860        )?;
3861        assert_eq!(status, 200);
3862        assert!(body.contains("sql-1"));
3863        Ok(())
3864    }
3865
3866    #[test]
3867    fn query_and_grpc_query_endpoints_return_results() -> Result<()> {
3868        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3869        db.ingest_chunk(&ChunkInput {
3870            id: "query-1".to_string(),
3871            doc_id: "doc-1".to_string(),
3872            content: "agent query endpoint".to_string(),
3873            embedding: vec![1.0, 0.0],
3874            metadata: json!({"tenant": "demo"}),
3875            source: None,
3876        })?;
3877        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3878            HaRuntimeProfile::default(),
3879        )));
3880        let config = ServerConfig::default();
3881
3882        let query_req = make_request(
3883            "POST",
3884            "/v1/query",
3885            Some(r#"{"query_text":"agent","top_k":1}"#),
3886            None,
3887        );
3888        let (status, _, body) = build_response(
3889            &db,
3890            Path::new(":memory:"),
3891            DurabilityProfile::Balanced,
3892            &config,
3893            &state,
3894            &query_req,
3895        )?;
3896        assert_eq!(status, 200);
3897        assert!(body.contains("\"kind\":\"query\""));
3898        assert!(body.contains("\"row_count\":1"));
3899
3900        let compact_req = make_request(
3901            "POST",
3902            "/v1/query-compact",
3903            Some(r#"{"query_text":"agent","top_k":1,"include_payloads":false}"#),
3904            None,
3905        );
3906        let (status, _, body) = build_response(
3907            &db,
3908            Path::new(":memory:"),
3909            DurabilityProfile::Balanced,
3910            &config,
3911            &state,
3912            &compact_req,
3913        )?;
3914        assert_eq!(status, 200);
3915        assert!(body.contains("\"kind\":\"query_compact\""));
3916        assert!(body.contains("\"chunk_ids\":[\"query-1\"]"));
3917
3918        let grpc_query_req = make_request(
3919            "POST",
3920            "/grpc/sqlrite.v1.QueryService/Query",
3921            Some(r#"{"query_text":"agent","top_k":1}"#),
3922            None,
3923        );
3924        let (status, _, body) = build_response(
3925            &db,
3926            Path::new(":memory:"),
3927            DurabilityProfile::Balanced,
3928            &config,
3929            &state,
3930            &grpc_query_req,
3931        )?;
3932        assert_eq!(status, 200);
3933        assert!(body.contains("\"kind\":\"query\""));
3934        Ok(())
3935    }
3936
3937    #[test]
3938    fn query_and_grpc_endpoints_reject_non_post_methods() -> Result<()> {
3939        let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3940        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3941            HaRuntimeProfile::default(),
3942        )));
3943        let config = ServerConfig::default();
3944
3945        let query_get = make_request("GET", "/v1/query", None, None);
3946        let (status, _, body) = build_response(
3947            &db,
3948            Path::new(":memory:"),
3949            DurabilityProfile::Balanced,
3950            &config,
3951            &state,
3952            &query_get,
3953        )?;
3954        assert_eq!(status, 405);
3955        assert!(body.contains("POST /v1/query"));
3956
3957        let compact_get = make_request("GET", "/v1/query-compact", None, None);
3958        let (status, _, body) = build_response(
3959            &db,
3960            Path::new(":memory:"),
3961            DurabilityProfile::Balanced,
3962            &config,
3963            &state,
3964            &compact_get,
3965        )?;
3966        assert_eq!(status, 405);
3967        assert!(body.contains("POST /v1/query-compact"));
3968
3969        let grpc_sql_get = make_request("GET", "/grpc/sqlrite.v1.QueryService/Sql", None, None);
3970        let (status, _, body) = build_response(
3971            &db,
3972            Path::new(":memory:"),
3973            DurabilityProfile::Balanced,
3974            &config,
3975            &state,
3976            &grpc_sql_get,
3977        )?;
3978        assert_eq!(status, 405);
3979        assert!(body.contains("POST /grpc/sqlrite.v1.QueryService/Sql"));
3980        Ok(())
3981    }
3982
3983    #[test]
3984    fn grpc_sql_endpoint_executes_sql_statement() -> Result<()> {
3985        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
3986        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
3987        db.ingest_chunk(&ChunkInput {
3988            id: "grpc-sql-1".to_string(),
3989            doc_id: "doc-1".to_string(),
3990            content: "grpc sql endpoint".to_string(),
3991            embedding: vec![1.0, 0.0],
3992            metadata: json!({}),
3993            source: None,
3994        })?;
3995        let state = Arc::new(Mutex::new(ControlPlaneState::new(
3996            HaRuntimeProfile::default(),
3997        )));
3998        let config = ServerConfig::default();
3999
4000        let grpc_sql_req = make_request(
4001            "POST",
4002            "/grpc/sqlrite.v1.QueryService/Sql",
4003            Some(r#"{"statement":"SELECT id, doc_id FROM chunks ORDER BY id ASC LIMIT 1;"}"#),
4004            None,
4005        );
4006        let (status, _, body) = build_response(
4007            &db,
4008            db_file.path(),
4009            DurabilityProfile::Balanced,
4010            &config,
4011            &state,
4012            &grpc_sql_req,
4013        )?;
4014        assert_eq!(status, 200);
4015        assert!(body.contains("\"kind\":\"query\""));
4016        assert!(body.contains("\"row_count\":1"));
4017        assert!(body.contains("grpc-sql-1"));
4018        Ok(())
4019    }
4020
4021    #[test]
4022    fn replication_append_and_ack_advances_commit_index() -> Result<()> {
4023        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4024        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4025        let config = replication_primary_config();
4026        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4027            config.ha_profile.clone(),
4028        )));
4029
4030        let append = make_request(
4031            "POST",
4032            "/control/v1/replication/append",
4033            Some(r#"{"operation":"ingest_chunk","payload":{"id":"c1"}}"#),
4034            None,
4035        );
4036        let (status, _, body) = build_response(
4037            &db,
4038            db_file.path(),
4039            DurabilityProfile::Balanced,
4040            &config,
4041            &state,
4042            &append,
4043        )?;
4044        assert_eq!(status, 200);
4045        assert!(body.contains("commit_index"));
4046
4047        let ack = make_request(
4048            "POST",
4049            "/control/v1/replication/ack",
4050            Some(r#"{"node_id":"node-b","index":1}"#),
4051            None,
4052        );
4053        let (status, _, body) = build_response(
4054            &db,
4055            db_file.path(),
4056            DurabilityProfile::Balanced,
4057            &config,
4058            &state,
4059            &ack,
4060        )?;
4061        assert_eq!(status, 200);
4062        assert!(body.contains("\"commit_index\":1"));
4063        Ok(())
4064    }
4065
4066    #[test]
4067    fn election_vote_rejects_stale_term() -> Result<()> {
4068        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4069        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4070        let mut config = replication_primary_config();
4071        config.ha_profile.replication.role = ServerRole::Replica;
4072        config.ha_profile.replication.node_id = "node-b".to_string();
4073        let mut state_inner = ControlPlaneState::new(config.ha_profile.clone());
4074        state_inner.runtime.current_term = 5;
4075        state_inner.runtime.note_log_position(10, 5);
4076        let state = Arc::new(Mutex::new(state_inner));
4077
4078        let vote = make_request(
4079            "POST",
4080            "/control/v1/election/request-vote",
4081            Some(
4082                r#"{"term":4,"candidate_id":"node-a","candidate_last_log_index":10,"candidate_last_log_term":5}"#,
4083            ),
4084            None,
4085        );
4086
4087        let (status, _, body) = build_response(
4088            &db,
4089            db_file.path(),
4090            DurabilityProfile::Balanced,
4091            &config,
4092            &state,
4093            &vote,
4094        )?;
4095        assert_eq!(status, 200);
4096        assert!(body.contains("\"vote_granted\":false"));
4097        assert!(body.contains("stale_term"));
4098        Ok(())
4099    }
4100
4101    #[test]
4102    fn reconciliation_returns_missing_entries() -> Result<()> {
4103        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4104        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4105        let config = replication_primary_config();
4106        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4107            config.ha_profile.clone(),
4108        )));
4109
4110        for i in 1..=3 {
4111            let append = make_request(
4112                "POST",
4113                "/control/v1/replication/append",
4114                Some(&format!(
4115                    "{{\"operation\":\"write\",\"payload\":{{\"n\":{i}}}}}"
4116                )),
4117                None,
4118            );
4119            let _ = build_response(
4120                &db,
4121                db_file.path(),
4122                DurabilityProfile::Balanced,
4123                &config,
4124                &state,
4125                &append,
4126            )?;
4127        }
4128
4129        let reconcile = make_request(
4130            "POST",
4131            "/control/v1/replication/reconcile",
4132            Some(r#"{"node_id":"node-c","last_applied_index":1}"#),
4133            None,
4134        );
4135        let (status, _, body) = build_response(
4136            &db,
4137            db_file.path(),
4138            DurabilityProfile::Balanced,
4139            &config,
4140            &state,
4141            &reconcile,
4142        )?;
4143        assert_eq!(status, 200);
4144        assert!(body.contains("missing_entries_count"));
4145        assert!(body.contains("\"index\":2"));
4146        Ok(())
4147    }
4148
4149    #[test]
4150    fn auto_failover_check_promotes_replica_when_forced() -> Result<()> {
4151        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4152        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4153        let mut config = replication_primary_config();
4154        config.ha_profile.replication.role = ServerRole::Replica;
4155        config.ha_profile.replication.node_id = "node-b".to_string();
4156        config.ha_profile.replication.failover_mode = FailoverMode::Automatic;
4157        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4158            config.ha_profile.clone(),
4159        )));
4160
4161        let check = make_request(
4162            "POST",
4163            "/control/v1/failover/auto-check",
4164            Some(r#"{"force":true,"reason":"test_auto"}"#),
4165            None,
4166        );
4167        let (status, _, body) = build_response(
4168            &db,
4169            db_file.path(),
4170            DurabilityProfile::Balanced,
4171            &config,
4172            &state,
4173            &check,
4174        )?;
4175        assert_eq!(status, 200);
4176        assert!(body.contains("\"triggered\":true"));
4177        assert!(body.contains("\"role\":\"primary\""));
4178        Ok(())
4179    }
4180
4181    #[test]
4182    fn auto_failover_check_accepts_missing_force_field() -> Result<()> {
4183        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4184        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4185        let mut config = replication_primary_config();
4186        config.ha_profile.replication.role = ServerRole::Replica;
4187        config.ha_profile.replication.node_id = "node-b".to_string();
4188        config.ha_profile.replication.failover_mode = FailoverMode::Automatic;
4189        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4190            config.ha_profile.clone(),
4191        )));
4192
4193        let check = make_request(
4194            "POST",
4195            "/control/v1/failover/auto-check",
4196            Some(r#"{"simulate_elapsed_ms":5000,"reason":"timeout-path"}"#),
4197            None,
4198        );
4199        let (status, _, body) = build_response(
4200            &db,
4201            db_file.path(),
4202            DurabilityProfile::Balanced,
4203            &config,
4204            &state,
4205            &check,
4206        )?;
4207        assert_eq!(status, 200);
4208        assert!(body.contains("\"triggered\":true"));
4209        assert!(body.contains("\"reason\":\"timeout-path\""));
4210        Ok(())
4211    }
4212
4213    #[test]
4214    fn chaos_disk_full_blocks_replication_append() -> Result<()> {
4215        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4216        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4217        let config = replication_primary_config();
4218        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4219            config.ha_profile.clone(),
4220        )));
4221
4222        let inject = make_request(
4223            "POST",
4224            "/control/v1/chaos/inject",
4225            Some(r#"{"scenario":"disk_full"}"#),
4226            None,
4227        );
4228        let (status, _, _) = build_response(
4229            &db,
4230            db_file.path(),
4231            DurabilityProfile::Balanced,
4232            &config,
4233            &state,
4234            &inject,
4235        )?;
4236        assert_eq!(status, 200);
4237
4238        let append = make_request(
4239            "POST",
4240            "/control/v1/replication/append",
4241            Some(r#"{"operation":"ingest_chunk","payload":{"id":"c1"}}"#),
4242            None,
4243        );
4244        let (status, _, body) = build_response(
4245            &db,
4246            db_file.path(),
4247            DurabilityProfile::Balanced,
4248            &config,
4249            &state,
4250            &append,
4251        )?;
4252        assert_eq!(status, 507);
4253        assert!(body.contains("disk_full"));
4254        Ok(())
4255    }
4256
4257    #[test]
4258    fn chaos_node_crash_blocks_health_but_not_status_endpoint() -> Result<()> {
4259        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4260        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4261        let config = replication_primary_config();
4262        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4263            config.ha_profile.clone(),
4264        )));
4265
4266        let inject = make_request(
4267            "POST",
4268            "/control/v1/chaos/inject",
4269            Some(r#"{"scenario":"node_crash"}"#),
4270            None,
4271        );
4272        let _ = build_response(
4273            &db,
4274            db_file.path(),
4275            DurabilityProfile::Balanced,
4276            &config,
4277            &state,
4278            &inject,
4279        )?;
4280
4281        let health = make_request("GET", "/readyz", None, None);
4282        let (status, _, body) = build_response(
4283            &db,
4284            db_file.path(),
4285            DurabilityProfile::Balanced,
4286            &config,
4287            &state,
4288            &health,
4289        )?;
4290        assert_eq!(status, 503);
4291        assert!(body.contains("node_crash"));
4292
4293        let status_req = make_request("GET", "/control/v1/chaos/status", None, None);
4294        let (status, _, body) = build_response(
4295            &db,
4296            db_file.path(),
4297            DurabilityProfile::Balanced,
4298            &config,
4299            &state,
4300            &status_req,
4301        )?;
4302        assert_eq!(status, 200);
4303        assert!(body.contains("active_fault_count"));
4304        Ok(())
4305    }
4306
4307    #[test]
4308    fn recovery_timing_is_recorded_between_start_and_mark_restored() -> Result<()> {
4309        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4310        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4311        let config = replication_primary_config();
4312        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4313            config.ha_profile.clone(),
4314        )));
4315
4316        let start = make_request("POST", "/control/v1/recovery/start", Some(r#"{}"#), None);
4317        let (status, _, _) = build_response(
4318            &db,
4319            db_file.path(),
4320            DurabilityProfile::Balanced,
4321            &config,
4322            &state,
4323            &start,
4324        )?;
4325        assert_eq!(status, 200);
4326
4327        let done = make_request(
4328            "POST",
4329            "/control/v1/recovery/mark-restored",
4330            Some(r#"{"note":"restore_done"}"#),
4331            None,
4332        );
4333        let (status, _, _) = build_response(
4334            &db,
4335            db_file.path(),
4336            DurabilityProfile::Balanced,
4337            &config,
4338            &state,
4339            &done,
4340        )?;
4341        assert_eq!(status, 200);
4342
4343        let resilience = make_request("GET", "/control/v1/resilience", None, None);
4344        let (status, _, body) = build_response(
4345            &db,
4346            db_file.path(),
4347            DurabilityProfile::Balanced,
4348            &config,
4349            &state,
4350            &resilience,
4351        )?;
4352        assert_eq!(status, 200);
4353        assert!(body.contains("restore_completed_total"));
4354        assert!(body.contains("last_restore_duration_ms"));
4355        Ok(())
4356    }
4357
4358    #[test]
4359    fn recovery_snapshot_verify_and_prune_endpoints_work() -> Result<()> {
4360        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4361        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4362        db.ingest_chunk(&ChunkInput {
4363            id: "c1".to_string(),
4364            doc_id: "d1".to_string(),
4365            content: "snapshot payload".to_string(),
4366            embedding: vec![1.0, 0.0],
4367            metadata: json!({}),
4368            source: None,
4369        })?;
4370        let backup_root = tempdir().map_err(std::io::Error::other)?;
4371        let mut config = replication_primary_config();
4372        config.ha_profile.recovery.backup_dir = backup_root.path().display().to_string();
4373        config.ha_profile.recovery.snapshot_interval_seconds = 1;
4374        config.ha_profile.recovery.pitr_retention_seconds = 60;
4375        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4376            config.ha_profile.clone(),
4377        )));
4378
4379        let snapshot_req = make_request(
4380            "POST",
4381            "/control/v1/recovery/snapshot",
4382            Some(r#"{"note":"s17_snapshot"}"#),
4383            None,
4384        );
4385        let (status, _, body) = build_response(
4386            &db,
4387            db_file.path(),
4388            DurabilityProfile::Balanced,
4389            &config,
4390            &state,
4391            &snapshot_req,
4392        )?;
4393        assert_eq!(status, 200);
4394        assert!(body.contains("snapshot_id"));
4395
4396        let list_req = make_request("GET", "/control/v1/recovery/snapshots?limit=10", None, None);
4397        let (status, _, body) = build_response(
4398            &db,
4399            db_file.path(),
4400            DurabilityProfile::Balanced,
4401            &config,
4402            &state,
4403            &list_req,
4404        )?;
4405        assert_eq!(status, 200);
4406        assert!(body.contains("\"count\":1"));
4407
4408        let verify_req = make_request(
4409            "POST",
4410            "/control/v1/recovery/verify-restore",
4411            Some(r#"{"keep_artifact":false}"#),
4412            None,
4413        );
4414        let (status, _, body) = build_response(
4415            &db,
4416            db_file.path(),
4417            DurabilityProfile::Balanced,
4418            &config,
4419            &state,
4420            &verify_req,
4421        )?;
4422        assert_eq!(status, 200);
4423        assert!(body.contains("\"restore_verified\":true"));
4424
4425        let prune_req = make_request(
4426            "POST",
4427            "/control/v1/recovery/prune-snapshots",
4428            Some(r#"{"retention_seconds":0}"#),
4429            None,
4430        );
4431        let (status, _, body) = build_response(
4432            &db,
4433            db_file.path(),
4434            DurabilityProfile::Balanced,
4435            &config,
4436            &state,
4437            &prune_req,
4438        )?;
4439        assert_eq!(status, 200);
4440        assert!(body.contains("removed_count"));
4441        Ok(())
4442    }
4443
4444    #[test]
4445    fn observability_metrics_and_trace_endpoints_report_sql_activity() -> Result<()> {
4446        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4447        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4448        db.ingest_chunk(&ChunkInput {
4449            id: "q1".to_string(),
4450            doc_id: "d1".to_string(),
4451            content: "query trace".to_string(),
4452            embedding: vec![1.0, 0.0],
4453            metadata: json!({}),
4454            source: None,
4455        })?;
4456        let config = ServerConfig::default();
4457        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4458            config.ha_profile.clone(),
4459        )));
4460
4461        let sql_req = make_request(
4462            "POST",
4463            "/v1/sql",
4464            Some(r#"{"statement":"SELECT id FROM chunks ORDER BY id ASC LIMIT 1;"}"#),
4465            None,
4466        );
4467        let started = unix_ms_now();
4468        let (status, _, _) = build_response(
4469            &db,
4470            db_file.path(),
4471            DurabilityProfile::Balanced,
4472            &config,
4473            &state,
4474            &sql_req,
4475        )?;
4476        let duration_ms = unix_ms_now().saturating_sub(started);
4477        {
4478            let mut guard = state
4479                .lock()
4480                .map_err(|_| std::io::Error::other("state lock poisoned"))?;
4481            guard
4482                .observability
4483                .record_request("POST", "/v1/sql", status, duration_ms);
4484        }
4485
4486        let trace_req = make_request("GET", "/control/v1/traces/recent?limit=5", None, None);
4487        let (status, _, body) = build_response(
4488            &db,
4489            db_file.path(),
4490            DurabilityProfile::Balanced,
4491            &config,
4492            &state,
4493            &trace_req,
4494        )?;
4495        assert_eq!(status, 200);
4496        assert!(body.contains("\"/v1/sql\""));
4497
4498        let map_req = make_request("GET", "/control/v1/observability/metrics-map", None, None);
4499        let (status, _, body) = build_response(
4500            &db,
4501            db_file.path(),
4502            DurabilityProfile::Balanced,
4503            &config,
4504            &state,
4505            &map_req,
4506        )?;
4507        assert_eq!(status, 200);
4508        assert!(body.contains("sqlrite_requests_sql_total"));
4509        Ok(())
4510    }
4511
4512    #[test]
4513    fn alert_simulation_and_slo_report_reflect_thresholds() -> Result<()> {
4514        let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4515        let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4516        let mut config = replication_primary_config();
4517        config.ha_profile.replication.max_replication_lag_ms = 50;
4518        let state = Arc::new(Mutex::new(ControlPlaneState::new(
4519            config.ha_profile.clone(),
4520        )));
4521
4522        {
4523            let mut guard = state
4524                .lock()
4525                .map_err(|_| std::io::Error::other("state lock poisoned"))?;
4526            guard.runtime.replication_lag_ms = 120;
4527            guard.observability.record_request("GET", "/readyz", 200, 1);
4528            guard.observability.record_request("GET", "/readyz", 503, 1);
4529        }
4530
4531        let simulate_req = make_request(
4532            "POST",
4533            "/control/v1/alerts/simulate",
4534            Some(r#"{"sql_error_rate":0.20,"sql_avg_latency_ms":75.0,"replication_lag_ms":120}"#),
4535            None,
4536        );
4537        let (status, _, body) = build_response(
4538            &db,
4539            db_file.path(),
4540            DurabilityProfile::Balanced,
4541            &config,
4542            &state,
4543            &simulate_req,
4544        )?;
4545        assert_eq!(status, 200);
4546        assert!(body.contains("replication_lag_high"));
4547        assert!(body.contains("sql_error_rate_high"));
4548
4549        let slo_req = make_request("GET", "/control/v1/slo/report", None, None);
4550        let (status, _, body) = build_response(
4551            &db,
4552            db_file.path(),
4553            DurabilityProfile::Balanced,
4554            &config,
4555            &state,
4556            &slo_req,
4557        )?;
4558        assert_eq!(status, 200);
4559        assert!(body.contains("target_percent"));
4560        assert!(body.contains("target_seconds"));
4561
4562        let reset_req = make_request(
4563            "POST",
4564            "/control/v1/observability/reset",
4565            Some(r#"{}"#),
4566            None,
4567        );
4568        let (status, _, body) = build_response(
4569            &db,
4570            db_file.path(),
4571            DurabilityProfile::Balanced,
4572            &config,
4573            &state,
4574            &reset_req,
4575        )?;
4576        assert_eq!(status, 200);
4577        assert!(body.contains("\"requests_total\":0"));
4578        Ok(())
4579    }
4580}