1use crate::security::{AccessPolicy, AuditLogger};
2use crate::{
3 AccessContext, AccessOperation, AuditEvent, AuditExportFormat, AuditQuery, DurabilityProfile,
4 FailoverMode, HaRuntimeProfile, HaRuntimeState, JsonlAuditLogger, QueryProfile, RbacPolicy,
5 ReplicationLog, ReplicationLogEntry, Result, RuntimeConfig, ServerRole, SqlRite, SqlRiteError,
6 build_health_report, create_backup_snapshot, execute_sdk_query, execute_sdk_sql,
7 export_audit_events, list_backup_snapshots, prune_backup_snapshots,
8 restore_backup_file_verified, select_backup_snapshot_for_time,
9};
10use rusqlite::{Connection, OptionalExtension, params};
11use serde::Deserialize;
12use serde_json::{Value, json};
13use sqlrite_sdk_core::{QueryRequest as QueryApiRequest, SqlRequest as SqlApiRequest};
14use std::collections::{HashMap, VecDeque};
15use std::io::{BufRead, BufReader, Read, Write};
16use std::net::{TcpListener, TcpStream};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19use std::time::{SystemTime, UNIX_EPOCH};
20
21#[derive(Debug, Clone)]
22pub struct ServerConfig {
23 pub bind_addr: String,
24 pub ha_profile: HaRuntimeProfile,
25 pub control_api_token: Option<String>,
26 pub enable_sql_endpoint: bool,
27 pub security: ServerSecurityConfig,
28}
29
30#[derive(Debug, Clone)]
31pub struct ServerSecurityConfig {
32 pub secure_defaults: bool,
33 pub require_auth_context: bool,
34 pub policy: Option<RbacPolicy>,
35 pub audit_log_path: Option<PathBuf>,
36 pub audit_redacted_fields: Vec<String>,
37}
38
39impl Default for ServerSecurityConfig {
40 fn default() -> Self {
41 Self {
42 secure_defaults: false,
43 require_auth_context: false,
44 policy: None,
45 audit_log_path: None,
46 audit_redacted_fields: vec![
47 "statement".to_string(),
48 "query_embedding".to_string(),
49 "metadata_filters".to_string(),
50 "auth_token".to_string(),
51 ],
52 }
53 }
54}
55
56impl ServerSecurityConfig {
57 fn enabled(&self) -> bool {
58 self.secure_defaults
59 || self.require_auth_context
60 || self.policy.is_some()
61 || self.audit_log_path.is_some()
62 }
63}
64
65impl Default for ServerConfig {
66 fn default() -> Self {
67 Self {
68 bind_addr: "127.0.0.1:8099".to_string(),
69 ha_profile: HaRuntimeProfile::default(),
70 control_api_token: None,
71 enable_sql_endpoint: true,
72 security: ServerSecurityConfig::default(),
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
78struct ControlPlaneState {
79 profile: HaRuntimeProfile,
80 runtime: HaRuntimeState,
81 replication_log: ReplicationLog,
82 replica_progress: HashMap<String, u64>,
83 resilience: ResilienceState,
84 chaos: ChaosHarnessState,
85 observability: ObservabilityState,
86}
87
88impl ControlPlaneState {
89 fn new(profile: HaRuntimeProfile) -> Self {
90 let runtime = HaRuntimeState::new(&profile);
91 Self {
92 profile,
93 runtime,
94 replication_log: ReplicationLog::new(),
95 replica_progress: HashMap::new(),
96 resilience: ResilienceState::default(),
97 chaos: ChaosHarnessState::default(),
98 observability: ObservabilityState::default(),
99 }
100 }
101
102 fn snapshot_json(&self) -> Value {
103 json!({
104 "profile": self.profile,
105 "state": self.runtime,
106 "replication": {
107 "log_len": self.replication_log.len(),
108 "last_log_index": self.replication_log.last_index(),
109 "last_log_term": self.replication_log.last_term(),
110 "replica_progress": self.replica_progress,
111 },
112 "resilience": self.resilience.to_json(self.chaos.active_count()),
113 "chaos": self.chaos.to_json(),
114 "observability": self.observability.to_json(),
115 })
116 }
117}
118
119#[derive(Debug, Clone, serde::Serialize)]
120struct QueryTraceRecord {
121 seq: u64,
122 timestamp_unix_ms: u64,
123 method: String,
124 path: String,
125 status: u16,
126 duration_ms: u64,
127}
128
129#[derive(Debug, Clone)]
130struct ObservabilityState {
131 request_seq: u64,
132 requests_total: u64,
133 requests_server_errors_total: u64,
134 requests_client_errors_total: u64,
135 sql_requests_total: u64,
136 sql_requests_failed_total: u64,
137 sql_latency_total_ms: u128,
138 sql_latency_max_ms: u64,
139 alert_simulations_total: u64,
140 traces: VecDeque<QueryTraceRecord>,
141 trace_capacity: usize,
142}
143
144impl Default for ObservabilityState {
145 fn default() -> Self {
146 Self {
147 request_seq: 0,
148 requests_total: 0,
149 requests_server_errors_total: 0,
150 requests_client_errors_total: 0,
151 sql_requests_total: 0,
152 sql_requests_failed_total: 0,
153 sql_latency_total_ms: 0,
154 sql_latency_max_ms: 0,
155 alert_simulations_total: 0,
156 traces: VecDeque::new(),
157 trace_capacity: 512,
158 }
159 }
160}
161
162impl ObservabilityState {
163 fn record_request(&mut self, method: &str, path: &str, status: u16, duration_ms: u64) {
164 self.request_seq = self.request_seq.saturating_add(1);
165 self.requests_total = self.requests_total.saturating_add(1);
166 if status >= 500 {
167 self.requests_server_errors_total = self.requests_server_errors_total.saturating_add(1);
168 } else if status >= 400 {
169 self.requests_client_errors_total = self.requests_client_errors_total.saturating_add(1);
170 }
171
172 let (raw_path, _) = split_path_and_query(path);
173 if matches!(
174 raw_path,
175 "/v1/sql"
176 | "/v1/query"
177 | "/v1/query-compact"
178 | "/grpc/sqlrite.v1.QueryService/Query"
179 | "/grpc/sqlrite.v1.QueryService/Sql"
180 ) {
181 self.sql_requests_total = self.sql_requests_total.saturating_add(1);
182 self.sql_latency_total_ms = self
183 .sql_latency_total_ms
184 .saturating_add(duration_ms as u128);
185 self.sql_latency_max_ms = self.sql_latency_max_ms.max(duration_ms);
186 if status >= 400 {
187 self.sql_requests_failed_total = self.sql_requests_failed_total.saturating_add(1);
188 }
189 }
190
191 self.traces.push_back(QueryTraceRecord {
192 seq: self.request_seq,
193 timestamp_unix_ms: unix_ms_now(),
194 method: method.to_string(),
195 path: raw_path.to_string(),
196 status,
197 duration_ms,
198 });
199 while self.traces.len() > self.trace_capacity {
200 let _ = self.traces.pop_front();
201 }
202 }
203
204 fn sql_avg_latency_ms(&self) -> f64 {
205 if self.sql_requests_total == 0 {
206 return 0.0;
207 }
208 self.sql_latency_total_ms as f64 / self.sql_requests_total as f64
209 }
210
211 fn recent_traces(&self, limit: usize) -> Vec<QueryTraceRecord> {
212 let take = limit.max(1).min(self.trace_capacity);
213 self.traces.iter().rev().take(take).cloned().collect()
214 }
215
216 fn to_json(&self) -> Value {
217 json!({
218 "requests_total": self.requests_total,
219 "requests_server_errors_total": self.requests_server_errors_total,
220 "requests_client_errors_total": self.requests_client_errors_total,
221 "sql_requests_total": self.sql_requests_total,
222 "sql_requests_failed_total": self.sql_requests_failed_total,
223 "sql_avg_latency_ms": self.sql_avg_latency_ms(),
224 "sql_latency_max_ms": self.sql_latency_max_ms,
225 "alert_simulations_total": self.alert_simulations_total,
226 "trace_buffered": self.traces.len(),
227 "trace_capacity": self.trace_capacity,
228 })
229 }
230
231 fn reset(&mut self) {
232 self.requests_total = 0;
233 self.requests_server_errors_total = 0;
234 self.requests_client_errors_total = 0;
235 self.sql_requests_total = 0;
236 self.sql_requests_failed_total = 0;
237 self.sql_latency_total_ms = 0;
238 self.sql_latency_max_ms = 0;
239 self.traces.clear();
240 }
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, serde::Serialize)]
244#[serde(rename_all = "snake_case")]
245enum ChaosScenario {
246 NodeCrash,
247 DiskFull,
248 PartitionSubset,
249}
250
251impl ChaosScenario {
252 fn as_str(self) -> &'static str {
253 match self {
254 Self::NodeCrash => "node_crash",
255 Self::DiskFull => "disk_full",
256 Self::PartitionSubset => "partition_subset",
257 }
258 }
259}
260
261#[derive(Debug, Clone, Default, serde::Serialize)]
262struct ChaosFaultState {
263 scenario: String,
264 injected_at_unix_ms: u64,
265 duration_ms: Option<u64>,
266 note: Option<String>,
267 blocked_nodes: Vec<String>,
268}
269
270#[derive(Debug, Clone, Default)]
271struct ChaosHarnessState {
272 active_faults: HashMap<ChaosScenario, ChaosFaultState>,
273}
274
275impl ChaosHarnessState {
276 fn inject(&mut self, request: ChaosInjectRequest) {
277 let scenario = request.scenario;
278 self.active_faults.insert(
279 scenario,
280 ChaosFaultState {
281 scenario: scenario.as_str().to_string(),
282 injected_at_unix_ms: unix_ms_now(),
283 duration_ms: request.duration_ms,
284 note: request.note,
285 blocked_nodes: request.blocked_nodes.unwrap_or_default(),
286 },
287 );
288 }
289
290 fn clear(&mut self, scenario: Option<ChaosScenario>) {
291 if let Some(value) = scenario {
292 self.active_faults.remove(&value);
293 } else {
294 self.active_faults.clear();
295 }
296 }
297
298 fn cleanup_expired(&mut self, now_ms: u64) {
299 self.active_faults.retain(|_, fault| {
300 let Some(duration_ms) = fault.duration_ms else {
301 return true;
302 };
303 now_ms.saturating_sub(fault.injected_at_unix_ms) < duration_ms
304 });
305 }
306
307 fn has(&self, scenario: ChaosScenario) -> bool {
308 self.active_faults.contains_key(&scenario)
309 }
310
311 fn active_count(&self) -> usize {
312 self.active_faults.len()
313 }
314
315 fn to_json(&self) -> Value {
316 let faults = self
317 .active_faults
318 .values()
319 .cloned()
320 .map(|fault| serde_json::to_value(fault).unwrap_or(Value::Null))
321 .collect::<Vec<_>>();
322 json!({
323 "active_fault_count": faults.len(),
324 "faults": faults,
325 })
326 }
327}
328
329#[derive(Debug, Clone, Default)]
330struct ResilienceState {
331 failover_events_total: u64,
332 failover_completed_total: u64,
333 active_failover_started_unix_ms: Option<u64>,
334 last_failover_duration_ms: Option<u64>,
335 cumulative_failover_duration_ms: u128,
336 restore_events_total: u64,
337 restore_completed_total: u64,
338 active_restore_started_unix_ms: Option<u64>,
339 last_restore_duration_ms: Option<u64>,
340 cumulative_restore_duration_ms: u128,
341 chaos_injections_total: u64,
342 chaos_blocked_requests_total: u64,
343}
344
345impl ResilienceState {
346 fn start_failover(&mut self) {
347 if self.active_failover_started_unix_ms.is_none() {
348 self.failover_events_total = self.failover_events_total.saturating_add(1);
349 self.active_failover_started_unix_ms = Some(unix_ms_now());
350 }
351 }
352
353 fn complete_failover(&mut self) -> Option<u64> {
354 let start = self.active_failover_started_unix_ms.take()?;
355 let duration = unix_ms_now().saturating_sub(start);
356 self.failover_completed_total = self.failover_completed_total.saturating_add(1);
357 self.last_failover_duration_ms = Some(duration);
358 self.cumulative_failover_duration_ms = self
359 .cumulative_failover_duration_ms
360 .saturating_add(duration as u128);
361 Some(duration)
362 }
363
364 fn start_restore(&mut self) {
365 if self.active_restore_started_unix_ms.is_none() {
366 self.restore_events_total = self.restore_events_total.saturating_add(1);
367 self.active_restore_started_unix_ms = Some(unix_ms_now());
368 }
369 }
370
371 fn complete_restore(&mut self) -> Option<u64> {
372 let start = self.active_restore_started_unix_ms.take()?;
373 let duration = unix_ms_now().saturating_sub(start);
374 self.restore_completed_total = self.restore_completed_total.saturating_add(1);
375 self.last_restore_duration_ms = Some(duration);
376 self.cumulative_restore_duration_ms = self
377 .cumulative_restore_duration_ms
378 .saturating_add(duration as u128);
379 Some(duration)
380 }
381
382 fn avg_failover_duration_ms(&self) -> f64 {
383 if self.failover_completed_total == 0 {
384 return 0.0;
385 }
386 self.cumulative_failover_duration_ms as f64 / self.failover_completed_total as f64
387 }
388
389 fn avg_restore_duration_ms(&self) -> f64 {
390 if self.restore_completed_total == 0 {
391 return 0.0;
392 }
393 self.cumulative_restore_duration_ms as f64 / self.restore_completed_total as f64
394 }
395
396 fn to_json(&self, active_chaos_faults: usize) -> Value {
397 json!({
398 "failover_events_total": self.failover_events_total,
399 "failover_completed_total": self.failover_completed_total,
400 "active_failover_started_unix_ms": self.active_failover_started_unix_ms,
401 "last_failover_duration_ms": self.last_failover_duration_ms,
402 "avg_failover_duration_ms": self.avg_failover_duration_ms(),
403 "restore_events_total": self.restore_events_total,
404 "restore_completed_total": self.restore_completed_total,
405 "active_restore_started_unix_ms": self.active_restore_started_unix_ms,
406 "last_restore_duration_ms": self.last_restore_duration_ms,
407 "avg_restore_duration_ms": self.avg_restore_duration_ms(),
408 "chaos_injections_total": self.chaos_injections_total,
409 "chaos_blocked_requests_total": self.chaos_blocked_requests_total,
410 "chaos_active_faults": active_chaos_faults,
411 })
412 }
413}
414
415#[derive(Debug)]
416struct HttpRequest {
417 method: String,
418 path: String,
419 headers: HashMap<String, String>,
420 body: Vec<u8>,
421}
422
423#[derive(Debug, Deserialize, Default)]
424struct PromoteRequest {
425 leader_id: Option<String>,
426}
427
428#[derive(Debug, Deserialize, Default)]
429struct StepDownRequest {
430 leader_id: Option<String>,
431}
432
433#[derive(Debug, Deserialize, Default)]
434struct RecoveryRequest {
435 backup_artifact: Option<String>,
436 note: Option<String>,
437}
438
439#[derive(Debug, Deserialize, Default)]
440struct RecoveryStartRequest {
441 note: Option<String>,
442}
443
444#[derive(Debug, Deserialize, Default)]
445struct RecoverySnapshotRequest {
446 note: Option<String>,
447}
448
449#[derive(Debug, Deserialize, Default)]
450struct RecoveryVerifyRestoreRequest {
451 snapshot_path: Option<String>,
452 target_unix_ms: Option<u64>,
453 note: Option<String>,
454 #[serde(default)]
455 keep_artifact: bool,
456}
457
458#[derive(Debug, Deserialize, Default)]
459struct RecoveryPruneRequest {
460 retention_seconds: Option<u64>,
461}
462
463#[derive(Debug, Deserialize)]
464struct ReplicationAppendRequest {
465 operation: Option<String>,
466 #[serde(default)]
467 payload: Value,
468}
469
470#[derive(Debug, Deserialize)]
471struct ReplicationReceiveRequest {
472 term: u64,
473 leader_id: String,
474 prev_log_index: u64,
475 prev_log_term: u64,
476 #[serde(default)]
477 entries: Vec<ReplicationLogEntry>,
478 leader_commit: Option<u64>,
479}
480
481#[derive(Debug, Deserialize)]
482struct ReplicationAckRequest {
483 node_id: String,
484 index: u64,
485}
486
487#[derive(Debug, Deserialize, Default)]
488struct ReplicationReconcileRequest {
489 node_id: Option<String>,
490 last_applied_index: Option<u64>,
491 commit_index: Option<u64>,
492 replication_lag_ms: Option<u64>,
493}
494
495#[derive(Debug, Deserialize)]
496struct ElectionVoteRequest {
497 term: u64,
498 candidate_id: String,
499 candidate_last_log_index: u64,
500 candidate_last_log_term: u64,
501}
502
503#[derive(Debug, Deserialize)]
504struct ElectionHeartbeatRequest {
505 term: u64,
506 leader_id: String,
507 commit_index: u64,
508 leader_last_log_index: Option<u64>,
509 replication_lag_ms: Option<u64>,
510}
511
512#[derive(Debug, Deserialize, Default)]
513struct AutoFailoverCheckRequest {
514 #[serde(default)]
515 force: bool,
516 simulate_elapsed_ms: Option<u64>,
517 reason: Option<String>,
518}
519
520#[derive(Debug, Deserialize)]
521struct ChaosInjectRequest {
522 scenario: ChaosScenario,
523 duration_ms: Option<u64>,
524 note: Option<String>,
525 blocked_nodes: Option<Vec<String>>,
526}
527
528#[derive(Debug, Deserialize, Default)]
529struct ChaosClearRequest {
530 scenario: Option<ChaosScenario>,
531}
532
533#[derive(Debug, Deserialize, Default)]
534struct AlertSimulationRequest {
535 sql_error_rate: Option<f64>,
536 sql_avg_latency_ms: Option<f64>,
537 replication_lag_ms: Option<u64>,
538 restore_active_ms: Option<u64>,
539}
540
541#[derive(Debug, Deserialize, Default)]
542struct SecurityAuditExportRequest {
543 actor_id: Option<String>,
544 tenant_id: Option<String>,
545 operation: Option<AccessOperation>,
546 allowed: Option<bool>,
547 from_unix_ms: Option<u64>,
548 to_unix_ms: Option<u64>,
549 limit: Option<usize>,
550 output_path: Option<String>,
551 format: Option<String>,
552}
553
554#[derive(Debug, Deserialize, Default)]
555struct RerankHookRequest {
556 query_text: Option<String>,
557 query_embedding: Option<Vec<f32>>,
558 candidate_count: Option<usize>,
559 alpha: Option<f32>,
560 candidate_limit: Option<usize>,
561 query_profile: Option<String>,
562 metadata_filters: Option<HashMap<String, String>>,
563 doc_id: Option<String>,
564}
565
566pub fn serve_health_endpoints(
567 db_path: impl AsRef<Path>,
568 runtime: RuntimeConfig,
569 config: ServerConfig,
570) -> Result<()> {
571 config
572 .ha_profile
573 .validate()
574 .map_err(std::io::Error::other)?;
575
576 let db_path = db_path.as_ref().to_path_buf();
577 let db = SqlRite::open_with_config(&db_path, runtime.clone())?;
578 let listener = TcpListener::bind(&config.bind_addr)?;
579
580 let mut control = ControlPlaneState::new(config.ha_profile.clone());
581 restore_replication_state(&db_path, &mut control)?;
582 let state = Arc::new(Mutex::new(control));
583
584 for stream in listener.incoming() {
585 let mut stream = match stream {
586 Ok(stream) => stream,
587 Err(_) => continue,
588 };
589
590 if let Err(error) = handle_connection(
591 &db,
592 &db_path,
593 runtime.durability_profile,
594 &config,
595 &state,
596 &mut stream,
597 ) {
598 let _ = write_response(
599 &mut stream,
600 500,
601 "text/plain; charset=utf-8",
602 &format!("internal error: {error}"),
603 );
604 }
605 }
606
607 Ok(())
608}
609
610fn handle_connection(
611 db: &SqlRite,
612 db_path: &Path,
613 sql_profile: DurabilityProfile,
614 config: &ServerConfig,
615 state: &Arc<Mutex<ControlPlaneState>>,
616 stream: &mut TcpStream,
617) -> Result<()> {
618 let started_unix_ms = unix_ms_now();
619 let request = read_http_request(stream)?;
620 let (status, content_type, body) =
621 build_response(db, db_path, sql_profile, config, state, &request)?;
622 write_response(stream, status, content_type, &body)?;
623 if let Ok(mut control) = state.lock() {
624 let duration_ms = unix_ms_now().saturating_sub(started_unix_ms);
625 control
626 .observability
627 .record_request(&request.method, &request.path, status, duration_ms);
628 }
629 Ok(())
630}
631
632fn read_http_request(stream: &TcpStream) -> Result<HttpRequest> {
633 let mut reader = BufReader::new(stream.try_clone()?);
634 let mut first_line = String::new();
635 reader.read_line(&mut first_line)?;
636
637 let (method, path) = parse_http_request_line(&first_line)
638 .ok_or_else(|| std::io::Error::other("invalid HTTP request line"))?;
639
640 let mut headers = HashMap::new();
641 loop {
642 let mut line = String::new();
643 let read = reader.read_line(&mut line)?;
644 if read == 0 || line == "\r\n" || line == "\n" {
645 break;
646 }
647 if let Some((name, value)) = line.split_once(':') {
648 headers.insert(
649 name.trim().to_ascii_lowercase(),
650 value.trim().trim_end_matches('\r').to_string(),
651 );
652 }
653 }
654
655 let content_length = headers
656 .get("content-length")
657 .and_then(|raw| raw.parse::<usize>().ok())
658 .unwrap_or(0);
659
660 let mut body = vec![0u8; content_length];
661 if content_length > 0 {
662 reader.read_exact(&mut body)?;
663 }
664
665 Ok(HttpRequest {
666 method: method.to_string(),
667 path: path.to_string(),
668 headers,
669 body,
670 })
671}
672
673fn parse_http_request_line(first_line: &str) -> Option<(&str, &str)> {
674 let mut parts = first_line.split_whitespace();
675 let method = parts.next()?;
676 let path = parts.next()?;
677 Some((method, path))
678}
679
680fn build_response(
681 db: &SqlRite,
682 db_path: &Path,
683 sql_profile: DurabilityProfile,
684 config: &ServerConfig,
685 state: &Arc<Mutex<ControlPlaneState>>,
686 request: &HttpRequest,
687) -> Result<(u16, &'static str, String)> {
688 let (path, query) = split_path_and_query(&request.path);
689 let now_ms = unix_ms_now();
690
691 let chaos_control_endpoint = matches!(
692 path,
693 "/control/v1/chaos/status" | "/control/v1/chaos/inject" | "/control/v1/chaos/clear"
694 );
695 {
696 let mut control = state
697 .lock()
698 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
699 control.chaos.cleanup_expired(now_ms);
700 if !chaos_control_endpoint
701 && let Some(blocked) = chaos_blocking_response(&mut control, request, path)
702 {
703 return Ok(blocked);
704 }
705
706 let skip_auto_failover = matches!(
707 path,
708 "/control/v1/election/heartbeat"
709 | "/control/v1/replication/receive"
710 | "/control/v1/failover/auto-check"
711 );
712 if !skip_auto_failover {
713 let _ = maybe_trigger_automatic_failover(
714 db_path,
715 &mut control,
716 AutoFailoverEvalInput {
717 force: false,
718 simulated_elapsed_ms: None,
719 reason: Some("periodic_request_tick".to_string()),
720 },
721 )?;
722 }
723 }
724
725 match (request.method.as_str(), path) {
726 ("GET", "/healthz") => {
727 let report = build_health_report(db)?;
728 let control = state
729 .lock()
730 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
731 let payload = json!({
732 "storage": report,
733 "ha": control.snapshot_json(),
734 });
735 Ok((200, "application/json", payload.to_string()))
736 }
737 ("GET", "/readyz") => {
738 let report = build_health_report(db)?;
739 let control = state
740 .lock()
741 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
742 let mut ready = report.integrity_check_ok;
743 if control.profile.replication.enabled
744 && control.runtime.role != ServerRole::Primary
745 && control.runtime.leader_id.is_none()
746 {
747 ready = false;
748 }
749
750 let status = if ready { 200 } else { 503 };
751 let payload = json!({
752 "ready": ready,
753 "schema_version": report.schema_version,
754 "ha_enabled": control.profile.replication.enabled,
755 "role": control.runtime.role,
756 "leader_id": control.runtime.leader_id,
757 "term": control.runtime.current_term,
758 "commit_index": control.runtime.commit_index,
759 "last_log_index": control.runtime.last_log_index,
760 "active_chaos_faults": control.chaos.active_count(),
761 });
762 Ok((status, "application/json", payload.to_string()))
763 }
764 ("GET", "/metrics") => {
765 let report = build_health_report(db)?;
766 let control = state
767 .lock()
768 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
769 let role_metric = match control.runtime.role {
770 ServerRole::Standalone => 0,
771 ServerRole::Primary => 1,
772 ServerRole::Replica => 2,
773 };
774
775 let body = format!(
776 "sqlrite_chunk_count {}\n\
777 sqlrite_schema_version {}\n\
778 sqlrite_index_entries {}\n\
779 sqlrite_ha_enabled {}\n\
780 sqlrite_ha_role {}\n\
781 sqlrite_ha_term {}\n\
782 sqlrite_ha_commit_index {}\n\
783 sqlrite_ha_last_log_index {}\n\
784 sqlrite_ha_last_log_term {}\n\
785 sqlrite_ha_replication_log_entries {}\n\
786 sqlrite_ha_replication_lag_ms {}\n\
787 sqlrite_ha_failover_in_progress {}\n\
788 sqlrite_ha_failover_events_total {}\n\
789 sqlrite_ha_failover_completed_total {}\n\
790 sqlrite_ha_failover_last_duration_ms {}\n\
791 sqlrite_ha_failover_avg_duration_ms {}\n\
792 sqlrite_ha_restore_events_total {}\n\
793 sqlrite_ha_restore_completed_total {}\n\
794 sqlrite_ha_restore_last_duration_ms {}\n\
795 sqlrite_ha_restore_avg_duration_ms {}\n\
796 sqlrite_ha_chaos_faults_active {}\n\
797 sqlrite_ha_chaos_injections_total {}\n\
798 sqlrite_ha_chaos_blocked_requests_total {}\n\
799 sqlrite_requests_total {}\n\
800 sqlrite_requests_server_errors_total {}\n\
801 sqlrite_requests_client_errors_total {}\n\
802 sqlrite_requests_sql_total {}\n\
803 sqlrite_requests_sql_errors_total {}\n\
804 sqlrite_requests_sql_avg_latency_ms {}\n\
805 sqlrite_requests_sql_max_latency_ms {}\n\
806 sqlrite_observability_traces_buffered {}\n\
807 sqlrite_alert_simulations_total {}\n",
808 report.chunk_count,
809 report.schema_version,
810 report.vector_index_entries,
811 if control.profile.replication.enabled {
812 1
813 } else {
814 0
815 },
816 role_metric,
817 control.runtime.current_term,
818 control.runtime.commit_index,
819 control.runtime.last_log_index,
820 control.runtime.last_log_term,
821 control.replication_log.len(),
822 control.runtime.replication_lag_ms,
823 if control.runtime.failover_in_progress {
824 1
825 } else {
826 0
827 },
828 control.resilience.failover_events_total,
829 control.resilience.failover_completed_total,
830 control.resilience.last_failover_duration_ms.unwrap_or(0),
831 control.resilience.avg_failover_duration_ms(),
832 control.resilience.restore_events_total,
833 control.resilience.restore_completed_total,
834 control.resilience.last_restore_duration_ms.unwrap_or(0),
835 control.resilience.avg_restore_duration_ms(),
836 control.chaos.active_count(),
837 control.resilience.chaos_injections_total,
838 control.resilience.chaos_blocked_requests_total,
839 control.observability.requests_total,
840 control.observability.requests_server_errors_total,
841 control.observability.requests_client_errors_total,
842 control.observability.sql_requests_total,
843 control.observability.sql_requests_failed_total,
844 control.observability.sql_avg_latency_ms(),
845 control.observability.sql_latency_max_ms,
846 control.observability.traces.len(),
847 control.observability.alert_simulations_total,
848 );
849 Ok((200, "text/plain; version=0.0.4", body))
850 }
851 ("GET", "/control/v1/profile") => {
852 let control = state
853 .lock()
854 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
855 Ok((
856 200,
857 "application/json",
858 serde_json::to_string(&control.profile)?,
859 ))
860 }
861 ("GET", "/control/v1/security") => Ok((
862 200,
863 "application/json",
864 security_summary_json(config).to_string(),
865 )),
866 ("POST", "/control/v1/security/audit/export") => {
867 if !authorize_control_request(request, config) {
868 return Ok(unauthorized_response());
869 }
870 let Some(audit_path) = &config.security.audit_log_path else {
871 return Ok((
872 400,
873 "application/json",
874 json!({"error": "audit log path is not configured"}).to_string(),
875 ));
876 };
877 let input = parse_optional_json_body::<SecurityAuditExportRequest>(request)
878 .map_err(std::io::Error::other)?;
879 let format = match input.format.as_deref() {
880 Some("json") => AuditExportFormat::Json,
881 Some("jsonl") | None => AuditExportFormat::Jsonl,
882 Some(other) => return Ok((
883 400,
884 "application/json",
885 json!({"error": format!("invalid format `{other}`; expected json or jsonl")})
886 .to_string(),
887 )),
888 };
889 let report = export_audit_events(
890 audit_path,
891 &AuditQuery {
892 actor_id: input.actor_id,
893 tenant_id: input.tenant_id,
894 operation: input.operation,
895 allowed: input.allowed,
896 from_unix_ms: input.from_unix_ms,
897 to_unix_ms: input.to_unix_ms,
898 limit: input.limit,
899 },
900 input.output_path.as_deref().map(Path::new),
901 format,
902 )?;
903 Ok((200, "application/json", serde_json::to_string(&report)?))
904 }
905 ("GET", "/control/v1/state") => {
906 let control = state
907 .lock()
908 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
909 Ok((200, "application/json", control.snapshot_json().to_string()))
910 }
911 ("GET", "/control/v1/peers") => {
912 let control = state
913 .lock()
914 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
915 let payload = json!({
916 "node_id": control.profile.replication.node_id,
917 "advertise_addr": control.profile.replication.advertise_addr,
918 "peers": control.profile.replication.peers,
919 "sync_ack_quorum": control.profile.replication.sync_ack_quorum,
920 "replica_progress": control.replica_progress,
921 });
922 Ok((200, "application/json", payload.to_string()))
923 }
924 ("GET", "/control/v1/replication/log") => {
925 let from = query
926 .get("from")
927 .and_then(|raw| raw.parse::<u64>().ok())
928 .unwrap_or(1);
929 let limit = query
930 .get("limit")
931 .and_then(|raw| raw.parse::<usize>().ok())
932 .unwrap_or(256)
933 .min(1_024);
934
935 let control = state
936 .lock()
937 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
938 let payload = json!({
939 "from": from,
940 "limit": limit,
941 "entries": control.replication_log.entries_from(from, limit),
942 "last_log_index": control.runtime.last_log_index,
943 "last_log_term": control.runtime.last_log_term,
944 "commit_index": control.runtime.commit_index,
945 });
946 Ok((200, "application/json", payload.to_string()))
947 }
948 ("GET", "/control/v1/resilience") => {
949 let control = state
950 .lock()
951 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
952 Ok((
953 200,
954 "application/json",
955 control
956 .resilience
957 .to_json(control.chaos.active_count())
958 .to_string(),
959 ))
960 }
961 ("GET", "/control/v1/observability/metrics-map") => {
962 let control = state
963 .lock()
964 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
965 let payload = json!({
966 "metrics": [
967 {"name":"sqlrite_chunk_count","group":"storage","type":"gauge"},
968 {"name":"sqlrite_schema_version","group":"storage","type":"gauge"},
969 {"name":"sqlrite_index_entries","group":"retrieval","type":"gauge"},
970 {"name":"sqlrite_ha_enabled","group":"ha","type":"gauge"},
971 {"name":"sqlrite_ha_role","group":"ha","type":"gauge"},
972 {"name":"sqlrite_ha_term","group":"ha","type":"gauge"},
973 {"name":"sqlrite_ha_commit_index","group":"ha","type":"gauge"},
974 {"name":"sqlrite_ha_last_log_index","group":"ha","type":"gauge"},
975 {"name":"sqlrite_ha_last_log_term","group":"ha","type":"gauge"},
976 {"name":"sqlrite_ha_replication_lag_ms","group":"ha","type":"gauge"},
977 {"name":"sqlrite_ha_failover_events_total","group":"resilience","type":"counter"},
978 {"name":"sqlrite_ha_failover_completed_total","group":"resilience","type":"counter"},
979 {"name":"sqlrite_ha_restore_events_total","group":"resilience","type":"counter"},
980 {"name":"sqlrite_ha_restore_completed_total","group":"resilience","type":"counter"},
981 {"name":"sqlrite_ha_chaos_injections_total","group":"chaos","type":"counter"},
982 {"name":"sqlrite_ha_chaos_blocked_requests_total","group":"chaos","type":"counter"},
983 {"name":"sqlrite_requests_total","group":"http","type":"counter"},
984 {"name":"sqlrite_requests_server_errors_total","group":"http","type":"counter"},
985 {"name":"sqlrite_requests_client_errors_total","group":"http","type":"counter"},
986 {"name":"sqlrite_requests_sql_total","group":"sql","type":"counter"},
987 {"name":"sqlrite_requests_sql_errors_total","group":"sql","type":"counter"},
988 {"name":"sqlrite_requests_sql_avg_latency_ms","group":"sql","type":"gauge"},
989 {"name":"sqlrite_requests_sql_max_latency_ms","group":"sql","type":"gauge"},
990 {"name":"sqlrite_observability_traces_buffered","group":"observability","type":"gauge"},
991 {"name":"sqlrite_alert_simulations_total","group":"observability","type":"counter"}
992 ],
993 "state": control.observability.to_json(),
994 });
995 Ok((200, "application/json", payload.to_string()))
996 }
997 ("GET", "/control/v1/traces/recent") => {
998 let limit = query
999 .get("limit")
1000 .and_then(|raw| raw.parse::<usize>().ok())
1001 .unwrap_or(50)
1002 .min(512);
1003 let control = state
1004 .lock()
1005 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1006 let payload = json!({
1007 "limit": limit,
1008 "traces": control.observability.recent_traces(limit),
1009 });
1010 Ok((200, "application/json", payload.to_string()))
1011 }
1012 ("POST", "/control/v1/observability/reset") => {
1013 if !authorize_control_request(request, config) {
1014 return Ok(unauthorized_response());
1015 }
1016 let mut control = state
1017 .lock()
1018 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1019 control.observability.reset();
1020 let payload = json!({
1021 "status": "reset",
1022 "observability": control.observability.to_json(),
1023 });
1024 Ok((200, "application/json", payload.to_string()))
1025 }
1026 ("GET", "/control/v1/alerts/templates") => {
1027 let control = state
1028 .lock()
1029 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1030 let payload = json!({
1031 "templates": [
1032 {
1033 "id": "sql_error_rate_high",
1034 "severity": "warning",
1035 "threshold": {"sql_error_rate_gt": 0.05},
1036 "description": "SQL API error rate exceeded 5%"
1037 },
1038 {
1039 "id": "sql_latency_high",
1040 "severity": "warning",
1041 "threshold": {"sql_avg_latency_ms_gt": 50.0},
1042 "description": "SQL API average latency exceeded 50ms"
1043 },
1044 {
1045 "id": "replication_lag_high",
1046 "severity": "critical",
1047 "threshold": {"replication_lag_ms_gt": control.profile.replication.max_replication_lag_ms},
1048 "description": "Replication lag exceeded configured max_replication_lag_ms"
1049 },
1050 {
1051 "id": "restore_stuck",
1052 "severity": "critical",
1053 "threshold": {"restore_active_ms_gt": control.profile.recovery.snapshot_interval_seconds.saturating_mul(1_000)},
1054 "description": "Restore workflow appears stuck beyond snapshot interval"
1055 }
1056 ]
1057 });
1058 Ok((200, "application/json", payload.to_string()))
1059 }
1060 ("POST", "/control/v1/alerts/simulate") => {
1061 if !authorize_control_request(request, config) {
1062 return Ok(unauthorized_response());
1063 }
1064 let input = parse_optional_json_body::<AlertSimulationRequest>(request)
1065 .map_err(std::io::Error::other)?;
1066 let mut control = state
1067 .lock()
1068 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1069
1070 let observed_sql_error_rate = if control.observability.sql_requests_total == 0 {
1071 0.0
1072 } else {
1073 control.observability.sql_requests_failed_total as f64
1074 / control.observability.sql_requests_total as f64
1075 };
1076 let observed_restore_active_ms = control
1077 .resilience
1078 .active_restore_started_unix_ms
1079 .map(|start| now_ms.saturating_sub(start))
1080 .unwrap_or(0);
1081 let eval_sql_error_rate = input.sql_error_rate.unwrap_or(observed_sql_error_rate);
1082 let eval_sql_avg_latency_ms = input
1083 .sql_avg_latency_ms
1084 .unwrap_or(control.observability.sql_avg_latency_ms());
1085 let eval_replication_lag_ms = input
1086 .replication_lag_ms
1087 .unwrap_or(control.runtime.replication_lag_ms);
1088 let eval_restore_active_ms = input
1089 .restore_active_ms
1090 .unwrap_or(observed_restore_active_ms);
1091
1092 let mut fired = Vec::<Value>::new();
1093 if eval_sql_error_rate > 0.05 {
1094 fired.push(json!({"id":"sql_error_rate_high","severity":"warning","value":eval_sql_error_rate}));
1095 }
1096 if eval_sql_avg_latency_ms > 50.0 {
1097 fired.push(json!({"id":"sql_latency_high","severity":"warning","value":eval_sql_avg_latency_ms}));
1098 }
1099 if eval_replication_lag_ms > control.profile.replication.max_replication_lag_ms {
1100 fired.push(json!({"id":"replication_lag_high","severity":"critical","value":eval_replication_lag_ms}));
1101 }
1102 if eval_restore_active_ms
1103 > control
1104 .profile
1105 .recovery
1106 .snapshot_interval_seconds
1107 .saturating_mul(1_000)
1108 {
1109 fired.push(json!({"id":"restore_stuck","severity":"critical","value":eval_restore_active_ms}));
1110 }
1111 control.observability.alert_simulations_total = control
1112 .observability
1113 .alert_simulations_total
1114 .saturating_add(1);
1115 let payload = json!({
1116 "fired_alerts": fired,
1117 "evaluated": {
1118 "sql_error_rate": eval_sql_error_rate,
1119 "sql_avg_latency_ms": eval_sql_avg_latency_ms,
1120 "replication_lag_ms": eval_replication_lag_ms,
1121 "restore_active_ms": eval_restore_active_ms,
1122 },
1123 "simulation_count": control.observability.alert_simulations_total,
1124 });
1125 Ok((200, "application/json", payload.to_string()))
1126 }
1127 ("GET", "/control/v1/slo/report") => {
1128 let control = state
1129 .lock()
1130 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1131 let availability_percent = if control.observability.requests_total == 0 {
1132 100.0
1133 } else {
1134 let successful = control
1135 .observability
1136 .requests_total
1137 .saturating_sub(control.observability.requests_server_errors_total);
1138 (successful as f64 / control.observability.requests_total as f64) * 100.0
1139 };
1140 let rpo_seconds = (control.runtime.replication_lag_ms as f64) / 1_000.0;
1141 let payload = json!({
1142 "availability": {
1143 "observed_percent": availability_percent,
1144 "target_percent": 99.95,
1145 "passes_target": availability_percent >= 99.95,
1146 "requests_total": control.observability.requests_total,
1147 "server_errors_total": control.observability.requests_server_errors_total,
1148 },
1149 "rpo": {
1150 "observed_seconds": rpo_seconds,
1151 "target_seconds": 60.0,
1152 "passes_target": rpo_seconds <= 60.0,
1153 },
1154 "resilience": control.resilience.to_json(control.chaos.active_count()),
1155 "observability": control.observability.to_json(),
1156 });
1157 Ok((200, "application/json", payload.to_string()))
1158 }
1159 ("GET", "/control/v1/failover/status") => {
1160 let control = state
1161 .lock()
1162 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1163 let heartbeat_age_ms = control
1164 .runtime
1165 .last_heartbeat_unix_ms
1166 .map(|last| now_ms.saturating_sub(last));
1167 let payload = json!({
1168 "automatic_enabled": control.profile.replication.failover_mode == FailoverMode::Automatic,
1169 "heartbeat_age_ms": heartbeat_age_ms,
1170 "election_timeout_ms": control.profile.replication.election_timeout_ms,
1171 "role": control.runtime.role,
1172 "leader_id": control.runtime.leader_id,
1173 "failover_in_progress": control.runtime.failover_in_progress,
1174 "resilience": control.resilience.to_json(control.chaos.active_count()),
1175 });
1176 Ok((200, "application/json", payload.to_string()))
1177 }
1178 ("POST", "/control/v1/failover/auto-check") => {
1179 if !authorize_control_request(request, config) {
1180 return Ok(unauthorized_response());
1181 }
1182 let check = parse_optional_json_body::<AutoFailoverCheckRequest>(request)
1183 .map_err(std::io::Error::other)?;
1184 let mut control = state
1185 .lock()
1186 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1187 let event = maybe_trigger_automatic_failover(
1188 db_path,
1189 &mut control,
1190 AutoFailoverEvalInput {
1191 force: check.force,
1192 simulated_elapsed_ms: check.simulate_elapsed_ms,
1193 reason: check.reason,
1194 },
1195 )?;
1196 let payload = json!({
1197 "triggered": event.is_some(),
1198 "event": event,
1199 "state": control.runtime,
1200 "resilience": control.resilience.to_json(control.chaos.active_count()),
1201 });
1202 Ok((200, "application/json", payload.to_string()))
1203 }
1204 ("POST", "/control/v1/failover/start") => {
1205 if !authorize_control_request(request, config) {
1206 return Ok(unauthorized_response());
1207 }
1208
1209 let mut control = state
1210 .lock()
1211 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1212 control.runtime.mark_failover_started();
1213 control.resilience.start_failover();
1214 persist_resilience_event(db_path, "failover_started", None, "manual_start")?;
1215 persist_runtime_marker(db_path, "failover_started", "true")?;
1216 Ok((
1217 200,
1218 "application/json",
1219 serde_json::to_string(&control.runtime)?,
1220 ))
1221 }
1222 ("POST", "/control/v1/failover/promote") => {
1223 if !authorize_control_request(request, config) {
1224 return Ok(unauthorized_response());
1225 }
1226
1227 let promote = parse_optional_json_body::<PromoteRequest>(request)
1228 .map_err(std::io::Error::other)?;
1229 let mut control = state
1230 .lock()
1231 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1232
1233 let leader = promote
1234 .leader_id
1235 .unwrap_or_else(|| control.profile.replication.node_id.clone());
1236 control.profile.replication.enabled = true;
1237 control.profile.replication.role = ServerRole::Primary;
1238 control.runtime.promote_to_primary(leader);
1239 if let Some(duration_ms) = control.resilience.complete_failover() {
1240 persist_resilience_event(
1241 db_path,
1242 "failover_completed",
1243 Some(duration_ms),
1244 "manual_promote",
1245 )?;
1246 }
1247 persist_runtime_marker(
1248 db_path,
1249 "last_role_transition",
1250 &serde_json::to_string(&control.runtime)?,
1251 )?;
1252 Ok((
1253 200,
1254 "application/json",
1255 serde_json::to_string(&control.runtime)?,
1256 ))
1257 }
1258 ("POST", "/control/v1/failover/step-down") => {
1259 if !authorize_control_request(request, config) {
1260 return Ok(unauthorized_response());
1261 }
1262
1263 let step_down = parse_optional_json_body::<StepDownRequest>(request)
1264 .map_err(std::io::Error::other)?;
1265 let mut control = state
1266 .lock()
1267 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1268
1269 control.profile.replication.enabled = true;
1270 control.profile.replication.role = ServerRole::Replica;
1271 control.runtime.step_down_to_replica(step_down.leader_id);
1272 persist_runtime_marker(
1273 db_path,
1274 "last_role_transition",
1275 &serde_json::to_string(&control.runtime)?,
1276 )?;
1277 Ok((
1278 200,
1279 "application/json",
1280 serde_json::to_string(&control.runtime)?,
1281 ))
1282 }
1283 ("POST", "/control/v1/recovery/mark-restored") => {
1284 if !authorize_control_request(request, config) {
1285 return Ok(unauthorized_response());
1286 }
1287
1288 let recovery = parse_optional_json_body::<RecoveryRequest>(request)
1289 .map_err(std::io::Error::other)?;
1290 let mut control = state
1291 .lock()
1292 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1293
1294 let event = format!(
1295 "backup_artifact={} note={}",
1296 recovery
1297 .backup_artifact
1298 .unwrap_or_else(|| "<unspecified>".to_string()),
1299 recovery.note.unwrap_or_else(|| "<none>".to_string())
1300 );
1301 control.runtime.mark_recovery_event(event.clone());
1302 if control.resilience.active_restore_started_unix_ms.is_none() {
1303 control.resilience.start_restore();
1304 }
1305 if let Some(duration_ms) = control.resilience.complete_restore() {
1306 persist_resilience_event(
1307 db_path,
1308 "restore_completed",
1309 Some(duration_ms),
1310 "recovery_mark_restored",
1311 )?;
1312 }
1313 persist_runtime_marker(db_path, "last_recovery_event", &event)?;
1314
1315 Ok((
1316 200,
1317 "application/json",
1318 serde_json::to_string(&control.runtime)?,
1319 ))
1320 }
1321 ("POST", "/control/v1/recovery/start") => {
1322 if !authorize_control_request(request, config) {
1323 return Ok(unauthorized_response());
1324 }
1325 let recovery = parse_optional_json_body::<RecoveryStartRequest>(request)
1326 .map_err(std::io::Error::other)?;
1327 let mut control = state
1328 .lock()
1329 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1330 control.resilience.start_restore();
1331 let note = recovery
1332 .note
1333 .unwrap_or_else(|| "restore_started".to_string());
1334 persist_resilience_event(db_path, "restore_started", None, ¬e)?;
1335 Ok((
1336 200,
1337 "application/json",
1338 json!({
1339 "restore_in_progress": true,
1340 "started_unix_ms": control.resilience.active_restore_started_unix_ms,
1341 })
1342 .to_string(),
1343 ))
1344 }
1345 ("POST", "/control/v1/recovery/snapshot") => {
1346 if !authorize_control_request(request, config) {
1347 return Ok(unauthorized_response());
1348 }
1349 let input = parse_optional_json_body::<RecoverySnapshotRequest>(request)
1350 .map_err(std::io::Error::other)?;
1351 let backup_dir = {
1352 let control = state
1353 .lock()
1354 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1355 control.profile.recovery.backup_dir.clone()
1356 };
1357 let snapshot = create_backup_snapshot(db_path, &backup_dir, input.note.as_deref())?;
1358 let payload = json!({
1359 "snapshot": snapshot,
1360 "backup_dir": backup_dir,
1361 });
1362 Ok((200, "application/json", payload.to_string()))
1363 }
1364 ("GET", "/control/v1/recovery/snapshots") => {
1365 let backup_dir = {
1366 let control = state
1367 .lock()
1368 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1369 control.profile.recovery.backup_dir.clone()
1370 };
1371 let limit = query
1372 .get("limit")
1373 .and_then(|raw| raw.parse::<usize>().ok())
1374 .unwrap_or(100)
1375 .min(1_000);
1376 let mut snapshots = list_backup_snapshots(&backup_dir)?;
1377 if snapshots.len() > limit {
1378 snapshots.truncate(limit);
1379 }
1380 let payload = json!({
1381 "backup_dir": backup_dir,
1382 "count": snapshots.len(),
1383 "snapshots": snapshots,
1384 });
1385 Ok((200, "application/json", payload.to_string()))
1386 }
1387 ("POST", "/control/v1/recovery/verify-restore") => {
1388 if !authorize_control_request(request, config) {
1389 return Ok(unauthorized_response());
1390 }
1391 let input = parse_optional_json_body::<RecoveryVerifyRestoreRequest>(request)
1392 .map_err(std::io::Error::other)?;
1393 let backup_dir = {
1394 let control = state
1395 .lock()
1396 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1397 control.profile.recovery.backup_dir.clone()
1398 };
1399 let snapshots = list_backup_snapshots(&backup_dir)?;
1400 let selected = if let Some(snapshot_path) = input.snapshot_path.clone() {
1401 snapshots
1402 .into_iter()
1403 .find(|snapshot| snapshot.snapshot_path == snapshot_path)
1404 .ok_or_else(|| {
1405 std::io::Error::other("requested snapshot_path is not in backup catalog")
1406 })?
1407 } else if let Some(target) = input.target_unix_ms {
1408 select_backup_snapshot_for_time(&backup_dir, target)?.ok_or_else(|| {
1409 std::io::Error::other(
1410 "no snapshot exists at or before requested target_unix_ms",
1411 )
1412 })?
1413 } else {
1414 snapshots.into_iter().next().ok_or_else(|| {
1415 std::io::Error::other("no snapshots available for restore verification")
1416 })?
1417 };
1418 let verify_dir = Path::new(&backup_dir).join("restore_verification");
1419 std::fs::create_dir_all(&verify_dir)?;
1420 let target_path = verify_dir.join(format!(
1421 "verify-{}-{}.db",
1422 selected.snapshot_id,
1423 unix_ms_now()
1424 ));
1425 let report = restore_backup_file_verified(&selected.snapshot_path, &target_path)?;
1426 let note = input.note.unwrap_or_else(|| "verify_restore".to_string());
1427 {
1428 let mut control = state
1429 .lock()
1430 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1431 if control.resilience.active_restore_started_unix_ms.is_none() {
1432 control.resilience.start_restore();
1433 persist_resilience_event(db_path, "restore_started", None, ¬e)?;
1434 }
1435 let event = format!(
1436 "verify_restore snapshot={} artifact={}",
1437 selected.snapshot_id,
1438 target_path.display()
1439 );
1440 control.runtime.mark_recovery_event(event.clone());
1441 if let Some(duration_ms) = control.resilience.complete_restore() {
1442 persist_resilience_event(
1443 db_path,
1444 "restore_completed",
1445 Some(duration_ms),
1446 "verify_restore_completed",
1447 )?;
1448 }
1449 persist_runtime_marker(db_path, "last_recovery_event", &event)?;
1450 }
1451
1452 let keep_artifact = input.keep_artifact;
1453 if !keep_artifact {
1454 let _ = std::fs::remove_file(&target_path);
1455 let _ = std::fs::remove_file(Path::new(&format!("{}-wal", target_path.display())));
1456 let _ = std::fs::remove_file(Path::new(&format!("{}-shm", target_path.display())));
1457 }
1458 let payload = json!({
1459 "selected_snapshot": selected,
1460 "restore_verified": report.integrity_check_ok,
1461 "verification": report,
1462 "restore_artifact_path": target_path.display().to_string(),
1463 "artifact_kept": keep_artifact,
1464 });
1465 Ok((200, "application/json", payload.to_string()))
1466 }
1467 ("POST", "/control/v1/recovery/prune-snapshots") => {
1468 if !authorize_control_request(request, config) {
1469 return Ok(unauthorized_response());
1470 }
1471 let input = parse_optional_json_body::<RecoveryPruneRequest>(request)
1472 .map_err(std::io::Error::other)?;
1473 let backup_dir;
1474 let retention_seconds;
1475 {
1476 let control = state
1477 .lock()
1478 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1479 backup_dir = control.profile.recovery.backup_dir.clone();
1480 retention_seconds = input
1481 .retention_seconds
1482 .unwrap_or(control.profile.recovery.pitr_retention_seconds);
1483 }
1484 let report = prune_backup_snapshots(&backup_dir, retention_seconds)?;
1485 let payload = json!({
1486 "backup_dir": backup_dir,
1487 "report": report,
1488 });
1489 Ok((200, "application/json", payload.to_string()))
1490 }
1491 ("GET", "/control/v1/chaos/status") => {
1492 let control = state
1493 .lock()
1494 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1495 let payload = json!({
1496 "chaos": control.chaos.to_json(),
1497 "resilience": control.resilience.to_json(control.chaos.active_count()),
1498 });
1499 Ok((200, "application/json", payload.to_string()))
1500 }
1501 ("POST", "/control/v1/chaos/inject") => {
1502 if !authorize_control_request(request, config) {
1503 return Ok(unauthorized_response());
1504 }
1505 let inject = match parse_json_body::<ChaosInjectRequest>(request) {
1506 Ok(payload) => payload,
1507 Err(error) => {
1508 return Ok((400, "application/json", json!({"error": error}).to_string()));
1509 }
1510 };
1511 let mut control = state
1512 .lock()
1513 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1514 let scenario = inject.scenario.as_str().to_string();
1515 control.chaos.inject(inject);
1516 control.resilience.chaos_injections_total =
1517 control.resilience.chaos_injections_total.saturating_add(1);
1518 persist_chaos_event(db_path, "inject", &scenario)?;
1519 let payload = json!({
1520 "status": "injected",
1521 "scenario": scenario,
1522 "chaos": control.chaos.to_json(),
1523 });
1524 Ok((200, "application/json", payload.to_string()))
1525 }
1526 ("POST", "/control/v1/chaos/clear") => {
1527 if !authorize_control_request(request, config) {
1528 return Ok(unauthorized_response());
1529 }
1530 let clear = parse_optional_json_body::<ChaosClearRequest>(request)
1531 .map_err(std::io::Error::other)?;
1532 let mut control = state
1533 .lock()
1534 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1535 let scenario_name = clear.scenario.map(|scenario| scenario.as_str().to_string());
1536 control.chaos.clear(clear.scenario);
1537 persist_chaos_event(db_path, "clear", scenario_name.as_deref().unwrap_or("all"))?;
1538 let payload = json!({
1539 "status": "cleared",
1540 "scenario": scenario_name,
1541 "chaos": control.chaos.to_json(),
1542 });
1543 Ok((200, "application/json", payload.to_string()))
1544 }
1545 ("POST", "/control/v1/replication/append") => {
1546 if !authorize_control_request(request, config) {
1547 return Ok(unauthorized_response());
1548 }
1549
1550 let append = match parse_json_body::<ReplicationAppendRequest>(request) {
1551 Ok(payload) => payload,
1552 Err(error) => {
1553 return Ok((400, "application/json", json!({"error": error}).to_string()));
1554 }
1555 };
1556
1557 let mut control = state
1558 .lock()
1559 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1560 if !control.profile.replication.enabled || control.runtime.role != ServerRole::Primary {
1561 return Ok((
1562 409,
1563 "application/json",
1564 json!({"error": "replication append requires enabled primary role"})
1565 .to_string(),
1566 ));
1567 }
1568
1569 control.runtime.current_term = control.runtime.current_term.max(1);
1570 let operation = append
1571 .operation
1572 .unwrap_or_else(|| "state_mutation".to_string())
1573 .trim()
1574 .to_string();
1575 if operation.is_empty() {
1576 return Ok((
1577 400,
1578 "application/json",
1579 json!({"error": "operation cannot be empty"}).to_string(),
1580 ));
1581 }
1582
1583 let current_term = control.runtime.current_term;
1584 let node_id = control.profile.replication.node_id.clone();
1585 let entry = match control.replication_log.append_leader_event(
1586 current_term,
1587 &node_id,
1588 operation,
1589 append.payload,
1590 &node_id,
1591 ) {
1592 Ok(entry) => entry,
1593 Err(error) => {
1594 return Ok((400, "application/json", json!({"error": error}).to_string()));
1595 }
1596 };
1597 let last_index = control.replication_log.last_index();
1598 let last_term = control.replication_log.last_term();
1599 control.runtime.note_log_position(last_index, last_term);
1600
1601 let new_commit = control.replication_log.compute_commit_index(
1602 control.runtime.commit_index,
1603 control.profile.replication.sync_ack_quorum,
1604 );
1605 control.runtime.advance_commit_index(new_commit);
1606
1607 append_replication_entry_to_store(
1608 db_path,
1609 &entry,
1610 entry.index <= control.runtime.commit_index,
1611 )?;
1612 if new_commit > 0 {
1613 mark_committed_replication_entries(db_path, new_commit)?;
1614 }
1615
1616 let payload = json!({
1617 "entry": entry,
1618 "commit_index": control.runtime.commit_index,
1619 "required_quorum": control.profile.replication.sync_ack_quorum,
1620 "ack_count": control.replication_log.ack_count(control.runtime.last_log_index),
1621 });
1622 Ok((200, "application/json", payload.to_string()))
1623 }
1624 ("POST", "/control/v1/replication/receive") => {
1625 if !authorize_control_request(request, config) {
1626 return Ok(unauthorized_response());
1627 }
1628
1629 let receive = match parse_json_body::<ReplicationReceiveRequest>(request) {
1630 Ok(payload) => payload,
1631 Err(error) => {
1632 return Ok((400, "application/json", json!({"error": error}).to_string()));
1633 }
1634 };
1635
1636 let mut control = state
1637 .lock()
1638 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1639 if !control.profile.replication.enabled {
1640 return Ok((
1641 409,
1642 "application/json",
1643 json!({"error": "replication is not enabled"}).to_string(),
1644 ));
1645 }
1646 if receive.term < control.runtime.current_term {
1647 return Ok((
1648 409,
1649 "application/json",
1650 json!({"error": "stale replication term", "term": control.runtime.current_term}).to_string(),
1651 ));
1652 }
1653
1654 control.runtime.adopt_term(receive.term);
1655 control
1656 .runtime
1657 .step_down_to_replica(Some(receive.leader_id.clone()));
1658
1659 if let Err(error) = control.replication_log.append_remote_entries(
1660 receive.prev_log_index,
1661 receive.prev_log_term,
1662 &receive.entries,
1663 ) {
1664 return Ok((409, "application/json", json!({"error": error}).to_string()));
1665 }
1666
1667 let last_index = control.replication_log.last_index();
1668 let last_term = control.replication_log.last_term();
1669 control.runtime.note_log_position(last_index, last_term);
1670 if let Some(leader_commit) = receive.leader_commit {
1671 control.runtime.advance_commit_index(leader_commit);
1672 }
1673
1674 rewrite_replication_log_store(
1675 db_path,
1676 &control.replication_log,
1677 control.runtime.commit_index,
1678 )?;
1679 let payload = json!({
1680 "accepted": true,
1681 "term": control.runtime.current_term,
1682 "last_log_index": control.runtime.last_log_index,
1683 "last_log_term": control.runtime.last_log_term,
1684 "commit_index": control.runtime.commit_index,
1685 });
1686 Ok((200, "application/json", payload.to_string()))
1687 }
1688 ("POST", "/control/v1/replication/ack") => {
1689 if !authorize_control_request(request, config) {
1690 return Ok(unauthorized_response());
1691 }
1692 let ack = match parse_json_body::<ReplicationAckRequest>(request) {
1693 Ok(payload) => payload,
1694 Err(error) => {
1695 return Ok((400, "application/json", json!({"error": error}).to_string()));
1696 }
1697 };
1698
1699 let mut control = state
1700 .lock()
1701 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1702 if ack.index == 0 {
1703 return Ok((
1704 400,
1705 "application/json",
1706 json!({"error": "ack index must be >= 1"}).to_string(),
1707 ));
1708 }
1709 let ack_count = control
1710 .replication_log
1711 .acknowledge(ack.index, ack.node_id.clone());
1712 if ack_count == 0 {
1713 return Ok((
1714 409,
1715 "application/json",
1716 json!({"error": "ack index is outside local replication log"}).to_string(),
1717 ));
1718 }
1719 control
1720 .replica_progress
1721 .insert(ack.node_id.clone(), ack.index);
1722
1723 let new_commit = control.replication_log.compute_commit_index(
1724 control.runtime.commit_index,
1725 control.profile.replication.sync_ack_quorum,
1726 );
1727 control.runtime.advance_commit_index(new_commit);
1728 if new_commit > 0 {
1729 mark_committed_replication_entries(db_path, new_commit)?;
1730 }
1731
1732 let payload = json!({
1733 "node_id": ack.node_id,
1734 "index": ack.index,
1735 "ack_count": ack_count,
1736 "commit_index": control.runtime.commit_index,
1737 "required_quorum": control.profile.replication.sync_ack_quorum,
1738 });
1739 Ok((200, "application/json", payload.to_string()))
1740 }
1741 ("POST", "/control/v1/replication/reconcile") => {
1742 if !authorize_control_request(request, config) {
1743 return Ok(unauthorized_response());
1744 }
1745 let reconcile = parse_optional_json_body::<ReplicationReconcileRequest>(request)
1746 .map_err(std::io::Error::other)?;
1747 let mut control = state
1748 .lock()
1749 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1750 let node_id = reconcile
1751 .node_id
1752 .unwrap_or_else(|| "unknown-replica".to_string());
1753 let last_applied = reconcile.last_applied_index.unwrap_or(0);
1754 control
1755 .replica_progress
1756 .insert(node_id.clone(), last_applied);
1757
1758 if let Some(peer_commit) = reconcile.commit_index {
1759 control.runtime.advance_commit_index(peer_commit);
1761 }
1762 if let Some(lag_ms) = reconcile.replication_lag_ms {
1763 control.runtime.replication_lag_ms = lag_ms;
1764 } else {
1765 let lag_entries = control.runtime.commit_index.saturating_sub(last_applied);
1766 control.runtime.replication_lag_ms =
1767 lag_entries.saturating_mul(control.profile.replication.heartbeat_interval_ms);
1768 }
1769
1770 let missing_from = last_applied.saturating_add(1);
1771 let missing_entries = control.replication_log.entries_from(missing_from, 128);
1772 persist_reconcile_event(
1773 db_path,
1774 &node_id,
1775 last_applied,
1776 control.runtime.commit_index,
1777 control.runtime.replication_lag_ms,
1778 )?;
1779
1780 let payload = json!({
1781 "node_id": node_id,
1782 "last_applied_index": last_applied,
1783 "commit_index": control.runtime.commit_index,
1784 "missing_entries_count": missing_entries.len(),
1785 "missing_entries": missing_entries,
1786 "replication_lag_ms": control.runtime.replication_lag_ms,
1787 });
1788 Ok((200, "application/json", payload.to_string()))
1789 }
1790 ("POST", "/control/v1/election/request-vote") => {
1791 if !authorize_control_request(request, config) {
1792 return Ok(unauthorized_response());
1793 }
1794 let vote = match parse_json_body::<ElectionVoteRequest>(request) {
1795 Ok(payload) => payload,
1796 Err(error) => {
1797 return Ok((400, "application/json", json!({"error": error}).to_string()));
1798 }
1799 };
1800
1801 let mut control = state
1802 .lock()
1803 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1804 if !control.profile.replication.enabled {
1805 return Ok((
1806 409,
1807 "application/json",
1808 json!({"error": "election requires replication mode"}).to_string(),
1809 ));
1810 }
1811
1812 let mut granted = false;
1813 let reason: String;
1814 if vote.term < control.runtime.current_term {
1815 reason = "stale_term".to_string();
1816 } else {
1817 control.runtime.adopt_term(vote.term);
1818 if control.runtime.can_grant_vote(
1819 vote.term,
1820 &vote.candidate_id,
1821 vote.candidate_last_log_index,
1822 vote.candidate_last_log_term,
1823 ) {
1824 granted = true;
1825 control
1826 .runtime
1827 .grant_vote(vote.term, vote.candidate_id.clone());
1828 reason = "vote_granted".to_string();
1829 } else if control
1830 .runtime
1831 .voted_for
1832 .as_ref()
1833 .is_some_and(|value| value != &vote.candidate_id)
1834 {
1835 reason = "already_voted".to_string();
1836 } else {
1837 reason = "candidate_log_outdated".to_string();
1838 }
1839 }
1840
1841 persist_vote_event(
1842 db_path,
1843 control.runtime.current_term,
1844 &control.profile.replication.node_id,
1845 &vote.candidate_id,
1846 granted,
1847 &reason,
1848 )?;
1849
1850 let payload = json!({
1851 "term": control.runtime.current_term,
1852 "vote_granted": granted,
1853 "reason": reason,
1854 "voted_for": control.runtime.voted_for,
1855 });
1856 Ok((200, "application/json", payload.to_string()))
1857 }
1858 ("POST", "/control/v1/election/heartbeat") => {
1859 if !authorize_control_request(request, config) {
1860 return Ok(unauthorized_response());
1861 }
1862 let heartbeat = match parse_json_body::<ElectionHeartbeatRequest>(request) {
1863 Ok(payload) => payload,
1864 Err(error) => {
1865 return Ok((400, "application/json", json!({"error": error}).to_string()));
1866 }
1867 };
1868
1869 let mut control = state
1870 .lock()
1871 .map_err(|_| std::io::Error::other("control-plane state lock poisoned"))?;
1872 if heartbeat.term < control.runtime.current_term {
1873 return Ok((
1874 409,
1875 "application/json",
1876 json!({"accepted": false, "reason": "stale_term", "term": control.runtime.current_term}).to_string(),
1877 ));
1878 }
1879
1880 control.runtime.adopt_term(heartbeat.term);
1881 if heartbeat.leader_id != control.profile.replication.node_id {
1882 control
1883 .runtime
1884 .step_down_to_replica(Some(heartbeat.leader_id.clone()));
1885 }
1886 if let Some(leader_last_log_index) = heartbeat.leader_last_log_index
1887 && leader_last_log_index > control.runtime.last_log_index
1888 {
1889 control.runtime.mark_failover_started();
1890 }
1891 control.runtime.mark_heartbeat(
1892 Some(heartbeat.leader_id.clone()),
1893 heartbeat.commit_index,
1894 heartbeat.replication_lag_ms.unwrap_or(0),
1895 );
1896 if control.runtime.failover_in_progress
1897 && let Some(duration_ms) = control.resilience.complete_failover()
1898 {
1899 persist_resilience_event(
1900 db_path,
1901 "failover_completed",
1902 Some(duration_ms),
1903 "heartbeat_stabilized",
1904 )?;
1905 }
1906 if control.runtime.commit_index > 0 {
1907 mark_committed_replication_entries(db_path, control.runtime.commit_index)?;
1908 }
1909
1910 let payload = json!({
1911 "accepted": true,
1912 "term": control.runtime.current_term,
1913 "leader_id": heartbeat.leader_id,
1914 "role": control.runtime.role,
1915 "commit_index": control.runtime.commit_index,
1916 "last_applied_index": control.runtime.last_applied_index,
1917 });
1918 Ok((200, "application/json", payload.to_string()))
1919 }
1920 ("GET", "/v1/openapi.json") => Ok((
1921 200,
1922 "application/json",
1923 openapi_query_document(config.enable_sql_endpoint).to_string(),
1924 )),
1925 ("POST", "/v1/sql") if config.enable_sql_endpoint => {
1926 let sql_request = match parse_json_body::<SqlApiRequest>(request) {
1927 Ok(payload) => payload,
1928 Err(error) => {
1929 let payload = json!({"error": error});
1930 return Ok((400, "application/json", payload.to_string()));
1931 }
1932 };
1933 execute_sql_api_statement(db_path, sql_profile, config, request, sql_request)
1934 }
1935 ("POST", "/v1/query") if config.enable_sql_endpoint => {
1936 let input = match parse_json_body::<QueryApiRequest>(request) {
1937 Ok(payload) => payload,
1938 Err(error) => {
1939 let payload = json!({"error": error});
1940 return Ok((400, "application/json", payload.to_string()));
1941 }
1942 };
1943 match execute_query_api(db, config, request, path, input, false) {
1944 Ok(payload) => Ok((200, "application/json", payload.to_string())),
1945 Err(error)
1946 if matches!(
1947 &error,
1948 crate::SqlRiteError::Io(io_error)
1949 if io_error.kind() == std::io::ErrorKind::PermissionDenied
1950 ) =>
1951 {
1952 Ok((
1953 403,
1954 "application/json",
1955 json!({"error": error.to_string()}).to_string(),
1956 ))
1957 }
1958 Err(error) => Ok((
1959 400,
1960 "application/json",
1961 json!({"error": error.to_string()}).to_string(),
1962 )),
1963 }
1964 }
1965 ("POST", "/v1/query-compact") if config.enable_sql_endpoint => {
1966 let input = match parse_json_body::<QueryApiRequest>(request) {
1967 Ok(payload) => payload,
1968 Err(error) => {
1969 let payload = json!({"error": error});
1970 return Ok((400, "application/json", payload.to_string()));
1971 }
1972 };
1973 match execute_query_api(db, config, request, path, input, true) {
1974 Ok(payload) => Ok((200, "application/json", payload.to_string())),
1975 Err(error)
1976 if matches!(
1977 &error,
1978 crate::SqlRiteError::Io(io_error)
1979 if io_error.kind() == std::io::ErrorKind::PermissionDenied
1980 ) =>
1981 {
1982 Ok((
1983 403,
1984 "application/json",
1985 json!({"error": error.to_string()}).to_string(),
1986 ))
1987 }
1988 Err(error) => Ok((
1989 400,
1990 "application/json",
1991 json!({"error": error.to_string()}).to_string(),
1992 )),
1993 }
1994 }
1995 ("POST", "/v1/rerank-hook") if config.enable_sql_endpoint => {
1996 let input = match parse_json_body::<RerankHookRequest>(request) {
1997 Ok(payload) => payload,
1998 Err(error) => {
1999 let payload = json!({"error": error});
2000 return Ok((400, "application/json", payload.to_string()));
2001 }
2002 };
2003 match execute_rerank_hook_api(db, config, request, path, input) {
2004 Ok(payload) => Ok((200, "application/json", payload.to_string())),
2005 Err(error)
2006 if matches!(
2007 &error,
2008 crate::SqlRiteError::Io(io_error)
2009 if io_error.kind() == std::io::ErrorKind::PermissionDenied
2010 ) =>
2011 {
2012 Ok((
2013 403,
2014 "application/json",
2015 json!({"error": error.to_string()}).to_string(),
2016 ))
2017 }
2018 Err(error) => Ok((
2019 400,
2020 "application/json",
2021 json!({"error": error.to_string()}).to_string(),
2022 )),
2023 }
2024 }
2025 ("POST", "/grpc/sqlrite.v1.QueryService/Sql") if config.enable_sql_endpoint => {
2026 let sql_request = match parse_json_body::<SqlApiRequest>(request) {
2027 Ok(payload) => payload,
2028 Err(error) => {
2029 let payload = json!({"error": error});
2030 return Ok((400, "application/json", payload.to_string()));
2031 }
2032 };
2033 execute_sql_api_statement(db_path, sql_profile, config, request, sql_request)
2034 }
2035 ("POST", "/grpc/sqlrite.v1.QueryService/Query") if config.enable_sql_endpoint => {
2036 let input = match parse_json_body::<QueryApiRequest>(request) {
2037 Ok(payload) => payload,
2038 Err(error) => {
2039 let payload = json!({"error": error});
2040 return Ok((400, "application/json", payload.to_string()));
2041 }
2042 };
2043 match execute_query_api(db, config, request, path, input, false) {
2044 Ok(payload) => Ok((200, "application/json", payload.to_string())),
2045 Err(error)
2046 if matches!(
2047 &error,
2048 crate::SqlRiteError::Io(io_error)
2049 if io_error.kind() == std::io::ErrorKind::PermissionDenied
2050 ) =>
2051 {
2052 Ok((
2053 403,
2054 "application/json",
2055 json!({"error": error.to_string()}).to_string(),
2056 ))
2057 }
2058 Err(error) => Ok((
2059 400,
2060 "application/json",
2061 json!({"error": error.to_string()}).to_string(),
2062 )),
2063 }
2064 }
2065 ("POST", _) if path.starts_with("/control/") => Ok((
2066 404,
2067 "application/json",
2068 json!({"error": "unknown control-plane endpoint"}).to_string(),
2069 )),
2070 ("GET", _) if path.starts_with("/control/") => Ok((
2071 404,
2072 "application/json",
2073 json!({"error": "unknown control-plane endpoint"}).to_string(),
2074 )),
2075 (method, "/v1/sql") if method != "POST" => Ok((
2076 405,
2077 "application/json",
2078 json!({"error": "method not allowed; use POST /v1/sql"}).to_string(),
2079 )),
2080 (method, "/v1/query") if method != "POST" => Ok((
2081 405,
2082 "application/json",
2083 json!({"error": "method not allowed; use POST /v1/query"}).to_string(),
2084 )),
2085 (method, "/v1/query-compact") if method != "POST" => Ok((
2086 405,
2087 "application/json",
2088 json!({"error": "method not allowed; use POST /v1/query-compact"}).to_string(),
2089 )),
2090 (method, "/v1/rerank-hook") if method != "POST" => Ok((
2091 405,
2092 "application/json",
2093 json!({"error": "method not allowed; use POST /v1/rerank-hook"}).to_string(),
2094 )),
2095 (method, "/grpc/sqlrite.v1.QueryService/Sql") if method != "POST" => Ok((
2096 405,
2097 "application/json",
2098 json!({"error": "method not allowed; use POST /grpc/sqlrite.v1.QueryService/Sql"})
2099 .to_string(),
2100 )),
2101 (method, "/grpc/sqlrite.v1.QueryService/Query") if method != "POST" => Ok((
2102 405,
2103 "application/json",
2104 json!({"error": "method not allowed; use POST /grpc/sqlrite.v1.QueryService/Query"})
2105 .to_string(),
2106 )),
2107 _ => Ok((404, "text/plain; charset=utf-8", "not found".to_string())),
2108 }
2109}
2110
2111fn execute_sql_api_statement(
2112 db_path: &Path,
2113 sql_profile: DurabilityProfile,
2114 config: &ServerConfig,
2115 request: &HttpRequest,
2116 input: SqlApiRequest,
2117) -> Result<(u16, &'static str, String)> {
2118 if let Err(response) = authorize_request(config, request, AccessOperation::SqlAdmin, "/v1/sql")
2119 {
2120 return Ok(response);
2121 }
2122
2123 let statement_len = input.statement.len();
2124 match execute_sdk_sql(db_path, sql_profile, input) {
2125 Ok(payload) => {
2126 audit_request(
2127 config,
2128 request,
2129 AccessOperation::SqlAdmin,
2130 true,
2131 json!({"path": request.path.as_str(), "statement_len": statement_len}),
2132 )?;
2133 Ok((200, "application/json", payload.to_string()))
2134 }
2135 Err(error) if error.is_validation() => Ok((
2136 400,
2137 "application/json",
2138 json!({"error": error.to_string()}).to_string(),
2139 )),
2140 Err(error) => Ok((
2141 500,
2142 "application/json",
2143 json!({"error": error.to_string()}).to_string(),
2144 )),
2145 }
2146}
2147
2148fn execute_query_api(
2149 db: &SqlRite,
2150 config: &ServerConfig,
2151 request: &HttpRequest,
2152 path: &str,
2153 mut input: QueryApiRequest,
2154 compact: bool,
2155) -> Result<Value> {
2156 let context = authorize_request(config, request, AccessOperation::Query, path).map_err(
2157 |(_, _, body)| {
2158 std::io::Error::new(
2159 std::io::ErrorKind::PermissionDenied,
2160 extract_error_message(&body),
2161 )
2162 },
2163 )?;
2164
2165 if let Some(context) = context {
2166 let tenant = context.tenant_id.clone();
2167 let filters = input.metadata_filters.get_or_insert_with(HashMap::new);
2168 if let Some(existing) = filters.get("tenant")
2169 && existing != &tenant
2170 {
2171 audit_request(
2172 config,
2173 request,
2174 AccessOperation::Query,
2175 false,
2176 json!({"path": path, "reason": "tenant filter mismatch"}),
2177 )?;
2178 return Err(std::io::Error::new(
2179 std::io::ErrorKind::PermissionDenied,
2180 "tenant filter mismatch",
2181 )
2182 .into());
2183 }
2184 filters.insert("tenant".to_string(), tenant);
2185 }
2186
2187 let envelope = execute_sdk_query(db, input).map_err(std::io::Error::other)?;
2188 audit_request(
2189 config,
2190 request,
2191 AccessOperation::Query,
2192 true,
2193 json!({"path": path, "row_count": envelope.row_count, "compact": compact}),
2194 )?;
2195 if compact {
2196 Ok(compact_query_envelope(envelope))
2197 } else {
2198 Ok(serde_json::to_value(envelope)?)
2199 }
2200}
2201
2202fn compact_query_envelope(envelope: sqlrite_sdk_core::QueryEnvelope<crate::SearchResult>) -> Value {
2203 let mut chunk_ids = Vec::with_capacity(envelope.rows.len());
2204 let mut hybrid_scores = Vec::with_capacity(envelope.rows.len());
2205 let mut vector_scores = Vec::with_capacity(envelope.rows.len());
2206 let mut text_scores = Vec::with_capacity(envelope.rows.len());
2207 let include_doc_ids = envelope.rows.iter().any(|row| !row.doc_id.is_empty());
2208 let include_contents = envelope.rows.iter().any(|row| !row.content.is_empty());
2209 let include_metadata = envelope.rows.iter().any(|row| !row.metadata.is_null());
2210 let mut doc_ids = if include_doc_ids {
2211 Some(Vec::with_capacity(envelope.rows.len()))
2212 } else {
2213 None
2214 };
2215 let mut contents = if include_contents {
2216 Some(Vec::with_capacity(envelope.rows.len()))
2217 } else {
2218 None
2219 };
2220 let mut metadata = if include_metadata {
2221 Some(Vec::with_capacity(envelope.rows.len()))
2222 } else {
2223 None
2224 };
2225
2226 for row in envelope.rows {
2227 chunk_ids.push(row.chunk_id);
2228 hybrid_scores.push(row.hybrid_score);
2229 vector_scores.push(row.vector_score);
2230 text_scores.push(row.text_score);
2231 if let Some(doc_ids) = &mut doc_ids {
2232 doc_ids.push(row.doc_id);
2233 }
2234 if let Some(contents) = &mut contents {
2235 contents.push(row.content);
2236 }
2237 if let Some(metadata_rows) = &mut metadata {
2238 metadata_rows.push(row.metadata);
2239 }
2240 }
2241
2242 let mut payload = serde_json::Map::new();
2243 payload.insert("kind".to_string(), json!("query_compact"));
2244 payload.insert("row_count".to_string(), json!(chunk_ids.len()));
2245 payload.insert("chunk_ids".to_string(), json!(chunk_ids));
2246 payload.insert("hybrid_scores".to_string(), json!(hybrid_scores));
2247 payload.insert("vector_scores".to_string(), json!(vector_scores));
2248 payload.insert("text_scores".to_string(), json!(text_scores));
2249 if let Some(doc_ids) = doc_ids {
2250 payload.insert("doc_ids".to_string(), json!(doc_ids));
2251 }
2252 if let Some(contents) = contents {
2253 payload.insert("contents".to_string(), json!(contents));
2254 }
2255 if let Some(metadata_rows) = metadata {
2256 payload.insert("metadata".to_string(), json!(metadata_rows));
2257 }
2258 Value::Object(payload)
2259}
2260
2261fn execute_rerank_hook_api(
2262 db: &SqlRite,
2263 config: &ServerConfig,
2264 request: &HttpRequest,
2265 path: &str,
2266 input: RerankHookRequest,
2267) -> Result<Value> {
2268 let context = authorize_request(config, request, AccessOperation::Query, path).map_err(
2269 |(_, _, body)| {
2270 std::io::Error::new(
2271 std::io::ErrorKind::PermissionDenied,
2272 extract_error_message(&body),
2273 )
2274 },
2275 )?;
2276
2277 let candidate_count = input.candidate_count.unwrap_or(25).max(1);
2278 let mut metadata_filters = input.metadata_filters.unwrap_or_default();
2279 if let Some(context) = context {
2280 if let Some(existing) = metadata_filters.get("tenant")
2281 && existing != &context.tenant_id
2282 {
2283 audit_request(
2284 config,
2285 request,
2286 AccessOperation::Query,
2287 false,
2288 json!({"path": path, "reason": "tenant filter mismatch"}),
2289 )?;
2290 return Err(std::io::Error::new(
2291 std::io::ErrorKind::PermissionDenied,
2292 "tenant filter mismatch",
2293 )
2294 .into());
2295 }
2296 metadata_filters.insert("tenant".to_string(), context.tenant_id);
2297 }
2298
2299 let request_model = crate::SearchRequest {
2300 query_text: input.query_text,
2301 query_embedding: input.query_embedding,
2302 top_k: candidate_count,
2303 alpha: input.alpha.unwrap_or(sqlrite_sdk_core::DEFAULT_ALPHA),
2304 candidate_limit: input
2305 .candidate_limit
2306 .unwrap_or(sqlrite_sdk_core::DEFAULT_CANDIDATE_LIMIT)
2307 .max(candidate_count),
2308 query_profile: parse_query_profile_api(input.query_profile.as_deref())
2309 .map_err(SqlRiteError::InvalidBenchmarkConfig)?,
2310 metadata_filters,
2311 doc_id: input.doc_id,
2312 ..crate::SearchRequest::default()
2313 };
2314 request_model
2315 .validate()
2316 .map_err(|error| SqlRiteError::InvalidBenchmarkConfig(error.to_string()))?;
2317 let rows = db.search(request_model)?;
2318 audit_request(
2319 config,
2320 request,
2321 AccessOperation::Query,
2322 true,
2323 json!({"path": path, "row_count": rows.len(), "kind": "rerank_hook"}),
2324 )?;
2325 Ok(json!({
2326 "kind": "rerank_hook",
2327 "row_count": rows.len(),
2328 "rows": rows,
2329 }))
2330}
2331
2332fn parse_query_profile_api(value: Option<&str>) -> std::result::Result<QueryProfile, String> {
2333 match value.map(str::trim).filter(|value| !value.is_empty()) {
2334 None => Ok(QueryProfile::Balanced),
2335 Some("balanced") => Ok(QueryProfile::Balanced),
2336 Some("latency") => Ok(QueryProfile::Latency),
2337 Some("recall") => Ok(QueryProfile::Recall),
2338 Some(other) => Err(format!(
2339 "invalid query_profile `{other}`; expected balanced|latency|recall"
2340 )),
2341 }
2342}
2343
2344fn openapi_query_document(sql_enabled: bool) -> Value {
2345 let mut paths = serde_json::Map::new();
2346 if sql_enabled {
2347 paths.insert(
2348 "/v1/sql".to_string(),
2349 json!({
2350 "post": {
2351 "summary": "Execute retrieval SQL statement",
2352 "requestBody": {
2353 "required": true,
2354 "content": {
2355 "application/json": {
2356 "schema": {"$ref": "#/components/schemas/SqlRequest"}
2357 }
2358 }
2359 },
2360 "responses": {
2361 "200": {"description": "Statement executed"},
2362 "400": {"description": "Invalid SQL request"}
2363 }
2364 }
2365 }),
2366 );
2367 paths.insert(
2368 "/v1/query".to_string(),
2369 json!({
2370 "post": {
2371 "summary": "Run semantic/lexical/hybrid retrieval query",
2372 "requestBody": {
2373 "required": true,
2374 "content": {
2375 "application/json": {
2376 "schema": {"$ref": "#/components/schemas/QueryRequest"}
2377 }
2378 }
2379 },
2380 "responses": {
2381 "200": {"description": "Query results"},
2382 "400": {"description": "Invalid query request"}
2383 }
2384 }
2385 }),
2386 );
2387 paths.insert(
2388 "/v1/query-compact".to_string(),
2389 json!({
2390 "post": {
2391 "summary": "Run retrieval query with compact array-oriented response for agents and benchmarks",
2392 "requestBody": {
2393 "required": true,
2394 "content": {
2395 "application/json": {
2396 "schema": {"$ref": "#/components/schemas/QueryRequest"}
2397 }
2398 }
2399 },
2400 "responses": {
2401 "200": {"description": "Compact query results"},
2402 "400": {"description": "Invalid query request"}
2403 }
2404 }
2405 }),
2406 );
2407 paths.insert(
2408 "/v1/rerank-hook".to_string(),
2409 json!({
2410 "post": {
2411 "summary": "Produce scored candidate features for external rerankers",
2412 "requestBody": {
2413 "required": true,
2414 "content": {
2415 "application/json": {
2416 "schema": {"$ref": "#/components/schemas/RerankHookRequest"}
2417 }
2418 }
2419 },
2420 "responses": {
2421 "200": {"description": "Rerank candidate payload"},
2422 "400": {"description": "Invalid rerank hook request"}
2423 }
2424 }
2425 }),
2426 );
2427 paths.insert(
2428 "/grpc/sqlrite.v1.QueryService/Sql".to_string(),
2429 json!({
2430 "post": {
2431 "summary": "gRPC-compat SQL query method over HTTP JSON bridge",
2432 "requestBody": {
2433 "required": true,
2434 "content": {
2435 "application/json": {
2436 "schema": {"$ref": "#/components/schemas/SqlRequest"}
2437 }
2438 }
2439 },
2440 "responses": {
2441 "200": {"description": "Statement executed"},
2442 "400": {"description": "Invalid SQL request"}
2443 }
2444 }
2445 }),
2446 );
2447 paths.insert(
2448 "/grpc/sqlrite.v1.QueryService/Query".to_string(),
2449 json!({
2450 "post": {
2451 "summary": "gRPC-compat retrieval query method over HTTP JSON bridge",
2452 "requestBody": {
2453 "required": true,
2454 "content": {
2455 "application/json": {
2456 "schema": {"$ref": "#/components/schemas/QueryRequest"}
2457 }
2458 }
2459 },
2460 "responses": {
2461 "200": {"description": "Query results"},
2462 "400": {"description": "Invalid query request"}
2463 }
2464 }
2465 }),
2466 );
2467 }
2468 paths.insert(
2469 "/v1/openapi.json".to_string(),
2470 json!({
2471 "get": {
2472 "summary": "Fetch OpenAPI document for query surfaces",
2473 "responses": {
2474 "200": {"description": "OpenAPI document"}
2475 }
2476 }
2477 }),
2478 );
2479
2480 json!({
2481 "openapi": "3.1.0",
2482 "info": {
2483 "title": "SQLRite Query API",
2484 "version": env!("CARGO_PKG_VERSION"),
2485 "description": "OpenAPI baseline for SQL and retrieval query surfaces."
2486 },
2487 "paths": paths,
2488 "components": {
2489 "schemas": {
2490 "SqlRequest": {
2491 "type": "object",
2492 "required": ["statement"],
2493 "properties": {
2494 "statement": {"type": "string"}
2495 }
2496 },
2497 "QueryRequest": {
2498 "type": "object",
2499 "properties": {
2500 "query_text": {"type": "string"},
2501 "query_embedding": {
2502 "type": "array",
2503 "items": {"type": "number"}
2504 },
2505 "top_k": {"type": "integer", "minimum": 1},
2506 "alpha": {"type": "number", "minimum": 0.0, "maximum": 1.0},
2507 "candidate_limit": {"type": "integer", "minimum": 1},
2508 "include_payloads": {"type": "boolean"},
2509 "query_profile": {
2510 "type": "string",
2511 "enum": ["balanced", "latency", "recall"]
2512 },
2513 "metadata_filters": {
2514 "type": "object",
2515 "additionalProperties": {"type": "string"}
2516 },
2517 "doc_id": {"type": "string"}
2518 }
2519 },
2520 "RerankHookRequest": {
2521 "type": "object",
2522 "properties": {
2523 "query_text": {"type": "string"},
2524 "query_embedding": {
2525 "type": "array",
2526 "items": {"type": "number"}
2527 },
2528 "candidate_count": {"type": "integer", "minimum": 1},
2529 "alpha": {"type": "number", "minimum": 0.0, "maximum": 1.0},
2530 "candidate_limit": {"type": "integer", "minimum": 1},
2531 "query_profile": {
2532 "type": "string",
2533 "enum": ["balanced", "latency", "recall"]
2534 },
2535 "metadata_filters": {
2536 "type": "object",
2537 "additionalProperties": {"type": "string"}
2538 },
2539 "doc_id": {"type": "string"}
2540 }
2541 }
2542 }
2543 }
2544 })
2545}
2546
2547fn split_path_and_query(path: &str) -> (&str, HashMap<String, String>) {
2548 let mut parts = path.splitn(2, '?');
2549 let raw_path = parts.next().unwrap_or("/");
2550 let query = parts.next().unwrap_or_default();
2551 (raw_path, parse_query_params(query))
2552}
2553
2554fn parse_query_params(raw: &str) -> HashMap<String, String> {
2555 raw.split('&')
2556 .filter(|pair| !pair.is_empty())
2557 .filter_map(|pair| {
2558 let mut parts = pair.splitn(2, '=');
2559 let key = parts.next()?.trim();
2560 if key.is_empty() {
2561 return None;
2562 }
2563 let value = parts.next().unwrap_or_default().trim();
2564 Some((key.to_string(), value.to_string()))
2565 })
2566 .collect()
2567}
2568
2569fn authorize_control_request(request: &HttpRequest, config: &ServerConfig) -> bool {
2570 let Some(expected) = &config.control_api_token else {
2571 return true;
2572 };
2573
2574 request
2575 .headers
2576 .get("x-sqlrite-control-token")
2577 .is_some_and(|provided| provided == expected)
2578}
2579
2580fn unauthorized_response() -> (u16, &'static str, String) {
2581 (
2582 401,
2583 "application/json",
2584 json!({"error": "unauthorized control-plane request"}).to_string(),
2585 )
2586}
2587
2588fn authorize_request(
2589 config: &ServerConfig,
2590 request: &HttpRequest,
2591 operation: AccessOperation,
2592 path: &str,
2593) -> std::result::Result<Option<AccessContext>, (u16, &'static str, String)> {
2594 if !config.security.enabled() {
2595 return Ok(None);
2596 }
2597
2598 let Some(context) = extract_access_context(request, config) else {
2599 let response = (
2600 401,
2601 "application/json",
2602 json!({"error": "missing auth context headers"}).to_string(),
2603 );
2604 let _ = audit_request(
2605 config,
2606 request,
2607 operation,
2608 false,
2609 json!({"path": path, "reason": "missing_auth_context"}),
2610 );
2611 return Err(response);
2612 };
2613
2614 if let Some(policy) = &config.security.policy
2615 && let Err(error) = policy.authorize(&context, operation, &context.tenant_id)
2616 {
2617 let response = (
2618 403,
2619 "application/json",
2620 json!({"error": error.to_string()}).to_string(),
2621 );
2622 let _ = audit_request(
2623 config,
2624 request,
2625 operation,
2626 false,
2627 json!({"path": path, "reason": error.to_string()}),
2628 );
2629 return Err(response);
2630 }
2631
2632 Ok(Some(context))
2633}
2634
2635fn extract_access_context(request: &HttpRequest, config: &ServerConfig) -> Option<AccessContext> {
2636 let actor_id = request.headers.get("x-sqlrite-actor-id").cloned();
2637 let tenant_id = request.headers.get("x-sqlrite-tenant-id").cloned();
2638 let roles = request
2639 .headers
2640 .get("x-sqlrite-roles")
2641 .map(|value| {
2642 value
2643 .split(',')
2644 .map(str::trim)
2645 .filter(|value| !value.is_empty())
2646 .map(str::to_string)
2647 .collect::<Vec<_>>()
2648 })
2649 .unwrap_or_default();
2650
2651 match (actor_id, tenant_id) {
2652 (Some(actor_id), Some(tenant_id)) => {
2653 Some(AccessContext::new(actor_id, tenant_id).with_roles(roles))
2654 }
2655 _ if config.security.require_auth_context || config.security.enabled() => None,
2656 _ => None,
2657 }
2658}
2659
2660fn audit_request(
2661 config: &ServerConfig,
2662 request: &HttpRequest,
2663 operation: AccessOperation,
2664 allowed: bool,
2665 detail: Value,
2666) -> Result<()> {
2667 let Some(path) = &config.security.audit_log_path else {
2668 return Ok(());
2669 };
2670
2671 let context = extract_access_context(request, config)
2672 .unwrap_or_else(|| AccessContext::new("anonymous", "unknown"));
2673 let logger = JsonlAuditLogger::new(path, config.security.audit_redacted_fields.clone())?;
2674 logger.log(&AuditEvent {
2675 unix_ms: unix_ms_now(),
2676 actor_id: context.actor_id,
2677 tenant_id: context.tenant_id,
2678 operation,
2679 allowed,
2680 detail,
2681 })
2682}
2683
2684fn security_summary_json(config: &ServerConfig) -> Value {
2685 json!({
2686 "enabled": config.security.enabled(),
2687 "secure_defaults": config.security.secure_defaults,
2688 "require_auth_context": config.security.require_auth_context,
2689 "audit_log_path": config.security.audit_log_path,
2690 "rbac_roles": config.security.policy.as_ref().map(|policy| policy.role_names()).unwrap_or_default(),
2691 })
2692}
2693
2694fn extract_error_message(body: &str) -> String {
2695 serde_json::from_str::<Value>(body)
2696 .ok()
2697 .and_then(|value| {
2698 value
2699 .get("error")
2700 .and_then(Value::as_str)
2701 .map(str::to_string)
2702 })
2703 .unwrap_or_else(|| body.to_string())
2704}
2705
2706fn parse_json_body<T: for<'de> Deserialize<'de>>(
2707 request: &HttpRequest,
2708) -> std::result::Result<T, String> {
2709 if request.body.is_empty() {
2710 return Err("JSON body is required".to_string());
2711 }
2712 serde_json::from_slice::<T>(&request.body)
2713 .map_err(|error| format!("invalid JSON body: {error}"))
2714}
2715
2716fn parse_optional_json_body<T>(request: &HttpRequest) -> std::result::Result<T, String>
2717where
2718 T: for<'de> Deserialize<'de> + Default,
2719{
2720 if request.body.is_empty() {
2721 return Ok(T::default());
2722 }
2723 parse_json_body::<T>(request)
2724}
2725
2726#[derive(Debug, serde::Serialize)]
2727struct AutoFailoverEvent {
2728 promoted: bool,
2729 reason: String,
2730 term: u64,
2731 leader_id: String,
2732 failover_duration_ms: Option<u64>,
2733}
2734
2735#[derive(Debug)]
2736struct AutoFailoverEvalInput {
2737 force: bool,
2738 simulated_elapsed_ms: Option<u64>,
2739 reason: Option<String>,
2740}
2741
2742fn maybe_trigger_automatic_failover(
2743 db_path: &Path,
2744 control: &mut ControlPlaneState,
2745 input: AutoFailoverEvalInput,
2746) -> Result<Option<AutoFailoverEvent>> {
2747 if !control.profile.replication.enabled {
2748 return Ok(None);
2749 }
2750 if control.profile.replication.failover_mode != FailoverMode::Automatic {
2751 return Ok(None);
2752 }
2753 if control.runtime.role == ServerRole::Primary {
2754 return Ok(None);
2755 }
2756
2757 let heartbeat_elapsed_ms = if let Some(simulated) = input.simulated_elapsed_ms {
2758 simulated
2759 } else if let Some(last) = control.runtime.last_heartbeat_unix_ms {
2760 unix_ms_now().saturating_sub(last)
2761 } else if input.force {
2762 control
2763 .profile
2764 .replication
2765 .election_timeout_ms
2766 .saturating_add(1)
2767 } else {
2768 return Ok(None);
2769 };
2770 let timed_out = heartbeat_elapsed_ms >= control.profile.replication.election_timeout_ms;
2771 if !input.force && !timed_out {
2772 return Ok(None);
2773 }
2774
2775 let reason = input
2776 .reason
2777 .unwrap_or_else(|| "automatic_failover_timeout".to_string());
2778 control.runtime.mark_failover_started();
2779 control.resilience.start_failover();
2780 persist_resilience_event(db_path, "failover_started", None, &reason)?;
2781
2782 let node_id = control.profile.replication.node_id.clone();
2783 control.profile.replication.role = ServerRole::Primary;
2784 control.runtime.promote_to_primary(node_id.clone());
2785 let duration = control.resilience.complete_failover();
2786 if let Some(duration_ms) = duration {
2787 persist_resilience_event(
2788 db_path,
2789 "failover_completed",
2790 Some(duration_ms),
2791 "automatic_failover_promote",
2792 )?;
2793 }
2794 persist_runtime_marker(
2795 db_path,
2796 "last_role_transition",
2797 &serde_json::to_string(&control.runtime)?,
2798 )?;
2799
2800 let entry = control
2801 .replication_log
2802 .append_leader_event(
2803 control.runtime.current_term.max(1),
2804 &node_id,
2805 "automatic_failover_promote".to_string(),
2806 json!({
2807 "reason": reason,
2808 "heartbeat_elapsed_ms": heartbeat_elapsed_ms,
2809 "election_timeout_ms": control.profile.replication.election_timeout_ms,
2810 }),
2811 &node_id,
2812 )
2813 .map_err(std::io::Error::other)?;
2814 let last_index = control.replication_log.last_index();
2815 let last_term = control.replication_log.last_term();
2816 control.runtime.note_log_position(last_index, last_term);
2817 let new_commit = control.replication_log.compute_commit_index(
2818 control.runtime.commit_index,
2819 control.profile.replication.sync_ack_quorum,
2820 );
2821 control.runtime.advance_commit_index(new_commit);
2822 append_replication_entry_to_store(
2823 db_path,
2824 &entry,
2825 entry.index <= control.runtime.commit_index,
2826 )?;
2827 if new_commit > 0 {
2828 mark_committed_replication_entries(db_path, new_commit)?;
2829 }
2830
2831 Ok(Some(AutoFailoverEvent {
2832 promoted: true,
2833 reason,
2834 term: control.runtime.current_term,
2835 leader_id: node_id,
2836 failover_duration_ms: duration,
2837 }))
2838}
2839
2840fn chaos_blocking_response(
2841 control: &mut ControlPlaneState,
2842 request: &HttpRequest,
2843 path: &str,
2844) -> Option<(u16, &'static str, String)> {
2845 if control.chaos.has(ChaosScenario::NodeCrash) {
2846 control.resilience.chaos_blocked_requests_total = control
2847 .resilience
2848 .chaos_blocked_requests_total
2849 .saturating_add(1);
2850 return Some((
2851 503,
2852 "application/json",
2853 json!({
2854 "error": "chaos fault active: node_crash",
2855 "path": path,
2856 })
2857 .to_string(),
2858 ));
2859 }
2860
2861 if control.chaos.has(ChaosScenario::DiskFull) {
2862 let blocks = path == "/control/v1/replication/append"
2863 || path == "/v1/sql"
2864 || path == "/grpc/sqlrite.v1.QueryService/Sql"
2865 || path == "/control/v1/recovery/start"
2866 || path == "/control/v1/recovery/mark-restored"
2867 || path == "/control/v1/recovery/snapshot"
2868 || path == "/control/v1/recovery/verify-restore"
2869 || path == "/control/v1/recovery/prune-snapshots";
2870 if blocks && request.method == "POST" {
2871 control.resilience.chaos_blocked_requests_total = control
2872 .resilience
2873 .chaos_blocked_requests_total
2874 .saturating_add(1);
2875 return Some((
2876 507,
2877 "application/json",
2878 json!({
2879 "error": "chaos fault active: disk_full",
2880 "path": path,
2881 })
2882 .to_string(),
2883 ));
2884 }
2885 }
2886
2887 if control.chaos.has(ChaosScenario::PartitionSubset)
2888 && matches!(
2889 path,
2890 "/control/v1/election/heartbeat" | "/control/v1/replication/receive"
2891 )
2892 {
2893 control.resilience.chaos_blocked_requests_total = control
2894 .resilience
2895 .chaos_blocked_requests_total
2896 .saturating_add(1);
2897 return Some((
2898 503,
2899 "application/json",
2900 json!({
2901 "error": "chaos fault active: partition_subset",
2902 "path": path,
2903 })
2904 .to_string(),
2905 ));
2906 }
2907
2908 None
2909}
2910
2911fn restore_replication_state(db_path: &Path, control: &mut ControlPlaneState) -> Result<()> {
2912 let conn = Connection::open(db_path)?;
2913 ensure_replication_catalog(&conn)?;
2914
2915 let mut stmt = conn.prepare(
2916 "
2917 SELECT idx, term, leader_id, operation, payload_json, checksum, created_at_unix_ms, committed
2918 FROM replication_log
2919 ORDER BY idx ASC
2920 ",
2921 )?;
2922 let mut rows = stmt.query([])?;
2923 let mut entries = Vec::new();
2924 let mut commit_index = 0u64;
2925
2926 while let Some(row) = rows.next()? {
2927 let index: u64 = row.get(0)?;
2928 let term: u64 = row.get(1)?;
2929 let leader_id: String = row.get(2)?;
2930 let operation: String = row.get(3)?;
2931 let payload_json: String = row.get(4)?;
2932 let checksum: String = row.get(5)?;
2933 let created_at_unix_ms: u64 = row.get(6)?;
2934 let committed: i64 = row.get(7)?;
2935 let payload = serde_json::from_str::<Value>(&payload_json)?;
2936 entries.push(ReplicationLogEntry {
2937 index,
2938 term,
2939 leader_id,
2940 operation,
2941 payload,
2942 checksum,
2943 created_at_unix_ms,
2944 });
2945 if committed == 1 {
2946 commit_index = index;
2947 }
2948 }
2949
2950 control.replication_log =
2951 ReplicationLog::from_entries(entries).map_err(std::io::Error::other)?;
2952 control.runtime.note_log_position(
2953 control.replication_log.last_index(),
2954 control.replication_log.last_term(),
2955 );
2956 control.runtime.advance_commit_index(commit_index);
2957 control.runtime.current_term = control
2958 .runtime
2959 .current_term
2960 .max(control.runtime.last_log_term);
2961
2962 let voted_for: Option<String> = conn
2963 .query_row(
2964 "
2965 SELECT candidate_id
2966 FROM election_votes
2967 WHERE term = ?1 AND voter_id = ?2 AND granted = 1
2968 ORDER BY rowid DESC
2969 LIMIT 1
2970 ",
2971 params![
2972 control.runtime.current_term,
2973 control.profile.replication.node_id.as_str()
2974 ],
2975 |row| row.get(0),
2976 )
2977 .optional()?;
2978 control.runtime.voted_for = voted_for;
2979
2980 control.resilience.failover_events_total = conn.query_row(
2981 "SELECT COUNT(*) FROM ha_resilience_events WHERE event_type = 'failover_started'",
2982 [],
2983 |row| row.get(0),
2984 )?;
2985 control.resilience.failover_completed_total = conn.query_row(
2986 "SELECT COUNT(*) FROM ha_resilience_events WHERE event_type = 'failover_completed'",
2987 [],
2988 |row| row.get(0),
2989 )?;
2990 control.resilience.cumulative_failover_duration_ms = conn
2991 .query_row(
2992 "SELECT COALESCE(SUM(duration_ms), 0) FROM ha_resilience_events WHERE event_type = 'failover_completed'",
2993 [],
2994 |row| row.get::<_, i64>(0),
2995 )?
2996 .max(0) as u128;
2997 control.resilience.last_failover_duration_ms = conn
2998 .query_row(
2999 "SELECT duration_ms FROM ha_resilience_events WHERE event_type = 'failover_completed' ORDER BY rowid DESC LIMIT 1",
3000 [],
3001 |row| row.get::<_, i64>(0),
3002 )
3003 .optional()?
3004 .map(|value| value.max(0) as u64);
3005
3006 control.resilience.restore_events_total = conn.query_row(
3007 "SELECT COUNT(*) FROM ha_resilience_events WHERE event_type = 'restore_started'",
3008 [],
3009 |row| row.get(0),
3010 )?;
3011 control.resilience.restore_completed_total = conn.query_row(
3012 "SELECT COUNT(*) FROM ha_resilience_events WHERE event_type = 'restore_completed'",
3013 [],
3014 |row| row.get(0),
3015 )?;
3016 control.resilience.cumulative_restore_duration_ms = conn
3017 .query_row(
3018 "SELECT COALESCE(SUM(duration_ms), 0) FROM ha_resilience_events WHERE event_type = 'restore_completed'",
3019 [],
3020 |row| row.get::<_, i64>(0),
3021 )?
3022 .max(0) as u128;
3023 control.resilience.last_restore_duration_ms = conn
3024 .query_row(
3025 "SELECT duration_ms FROM ha_resilience_events WHERE event_type = 'restore_completed' ORDER BY rowid DESC LIMIT 1",
3026 [],
3027 |row| row.get::<_, i64>(0),
3028 )
3029 .optional()?
3030 .map(|value| value.max(0) as u64);
3031
3032 control.resilience.chaos_injections_total = conn.query_row(
3033 "SELECT COUNT(*) FROM ha_chaos_events WHERE action = 'inject'",
3034 [],
3035 |row| row.get(0),
3036 )?;
3037
3038 Ok(())
3039}
3040
3041fn ensure_replication_catalog(conn: &Connection) -> Result<()> {
3042 conn.execute_batch(
3043 "
3044 CREATE TABLE IF NOT EXISTS replication_log (
3045 idx INTEGER PRIMARY KEY,
3046 term INTEGER NOT NULL,
3047 leader_id TEXT NOT NULL,
3048 operation TEXT NOT NULL,
3049 payload_json TEXT NOT NULL,
3050 checksum TEXT NOT NULL,
3051 created_at_unix_ms INTEGER NOT NULL,
3052 committed INTEGER NOT NULL DEFAULT 0
3053 );
3054
3055 CREATE TABLE IF NOT EXISTS election_votes (
3056 term INTEGER NOT NULL,
3057 voter_id TEXT NOT NULL,
3058 candidate_id TEXT NOT NULL,
3059 granted INTEGER NOT NULL,
3060 reason TEXT NOT NULL,
3061 created_at TEXT NOT NULL DEFAULT (datetime('now'))
3062 );
3063
3064 CREATE TABLE IF NOT EXISTS replication_reconcile_events (
3065 node_id TEXT NOT NULL,
3066 last_applied_index INTEGER NOT NULL,
3067 commit_index INTEGER NOT NULL,
3068 replication_lag_ms INTEGER NOT NULL,
3069 created_at TEXT NOT NULL DEFAULT (datetime('now'))
3070 );
3071
3072 CREATE TABLE IF NOT EXISTS ha_runtime_markers (
3073 key TEXT PRIMARY KEY,
3074 value TEXT NOT NULL,
3075 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
3076 );
3077
3078 CREATE TABLE IF NOT EXISTS ha_resilience_events (
3079 event_type TEXT NOT NULL,
3080 duration_ms INTEGER,
3081 note TEXT,
3082 created_at_unix_ms INTEGER NOT NULL
3083 );
3084
3085 CREATE TABLE IF NOT EXISTS ha_chaos_events (
3086 action TEXT NOT NULL,
3087 scenario TEXT NOT NULL,
3088 created_at_unix_ms INTEGER NOT NULL
3089 );
3090 ",
3091 )?;
3092 Ok(())
3093}
3094
3095fn append_replication_entry_to_store(
3096 db_path: &Path,
3097 entry: &ReplicationLogEntry,
3098 committed: bool,
3099) -> Result<()> {
3100 let conn = Connection::open(db_path)?;
3101 ensure_replication_catalog(&conn)?;
3102 conn.execute(
3103 "
3104 INSERT OR REPLACE INTO replication_log
3105 (idx, term, leader_id, operation, payload_json, checksum, created_at_unix_ms, committed)
3106 VALUES
3107 (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
3108 ",
3109 params![
3110 entry.index,
3111 entry.term,
3112 entry.leader_id,
3113 entry.operation,
3114 serde_json::to_string(&entry.payload)?,
3115 entry.checksum,
3116 entry.created_at_unix_ms,
3117 if committed { 1 } else { 0 },
3118 ],
3119 )?;
3120 Ok(())
3121}
3122
3123fn rewrite_replication_log_store(
3124 db_path: &Path,
3125 log: &ReplicationLog,
3126 commit_index: u64,
3127) -> Result<()> {
3128 let mut conn = Connection::open(db_path)?;
3129 ensure_replication_catalog(&conn)?;
3130 let tx = conn.transaction()?;
3131 tx.execute("DELETE FROM replication_log", [])?;
3132
3133 for entry in log.entries() {
3134 tx.execute(
3135 "
3136 INSERT INTO replication_log
3137 (idx, term, leader_id, operation, payload_json, checksum, created_at_unix_ms, committed)
3138 VALUES
3139 (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
3140 ",
3141 params![
3142 entry.index,
3143 entry.term,
3144 entry.leader_id,
3145 entry.operation,
3146 serde_json::to_string(&entry.payload)?,
3147 entry.checksum,
3148 entry.created_at_unix_ms,
3149 if entry.index <= commit_index { 1 } else { 0 },
3150 ],
3151 )?;
3152 }
3153
3154 tx.commit()?;
3155 Ok(())
3156}
3157
3158fn mark_committed_replication_entries(db_path: &Path, commit_index: u64) -> Result<()> {
3159 let conn = Connection::open(db_path)?;
3160 ensure_replication_catalog(&conn)?;
3161 conn.execute(
3162 "UPDATE replication_log SET committed = 1 WHERE idx <= ?1",
3163 params![commit_index],
3164 )?;
3165 Ok(())
3166}
3167
3168fn persist_vote_event(
3169 db_path: &Path,
3170 term: u64,
3171 voter_id: &str,
3172 candidate_id: &str,
3173 granted: bool,
3174 reason: &str,
3175) -> Result<()> {
3176 let conn = Connection::open(db_path)?;
3177 ensure_replication_catalog(&conn)?;
3178 conn.execute(
3179 "
3180 INSERT INTO election_votes (term, voter_id, candidate_id, granted, reason)
3181 VALUES (?1, ?2, ?3, ?4, ?5)
3182 ",
3183 params![
3184 term,
3185 voter_id,
3186 candidate_id,
3187 if granted { 1 } else { 0 },
3188 reason
3189 ],
3190 )?;
3191 Ok(())
3192}
3193
3194fn persist_reconcile_event(
3195 db_path: &Path,
3196 node_id: &str,
3197 last_applied_index: u64,
3198 commit_index: u64,
3199 replication_lag_ms: u64,
3200) -> Result<()> {
3201 let conn = Connection::open(db_path)?;
3202 ensure_replication_catalog(&conn)?;
3203 conn.execute(
3204 "
3205 INSERT INTO replication_reconcile_events (node_id, last_applied_index, commit_index, replication_lag_ms)
3206 VALUES (?1, ?2, ?3, ?4)
3207 ",
3208 params![node_id, last_applied_index, commit_index, replication_lag_ms],
3209 )?;
3210 Ok(())
3211}
3212
3213fn persist_runtime_marker(db_path: &Path, key: &str, value: &str) -> Result<()> {
3214 let conn = Connection::open(db_path)?;
3215 ensure_replication_catalog(&conn)?;
3216 conn.execute(
3217 "
3218 INSERT INTO ha_runtime_markers (key, value, updated_at)
3219 VALUES (?1, ?2, datetime('now'))
3220 ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = datetime('now')
3221 ",
3222 params![key, value],
3223 )?;
3224 Ok(())
3225}
3226
3227fn persist_resilience_event(
3228 db_path: &Path,
3229 event_type: &str,
3230 duration_ms: Option<u64>,
3231 note: &str,
3232) -> Result<()> {
3233 let conn = Connection::open(db_path)?;
3234 ensure_replication_catalog(&conn)?;
3235 conn.execute(
3236 "
3237 INSERT INTO ha_resilience_events (event_type, duration_ms, note, created_at_unix_ms)
3238 VALUES (?1, ?2, ?3, ?4)
3239 ",
3240 params![
3241 event_type,
3242 duration_ms.map(|value| value as i64),
3243 note,
3244 unix_ms_now() as i64,
3245 ],
3246 )?;
3247 Ok(())
3248}
3249
3250fn persist_chaos_event(db_path: &Path, action: &str, scenario: &str) -> Result<()> {
3251 let conn = Connection::open(db_path)?;
3252 ensure_replication_catalog(&conn)?;
3253 conn.execute(
3254 "
3255 INSERT INTO ha_chaos_events (action, scenario, created_at_unix_ms)
3256 VALUES (?1, ?2, ?3)
3257 ",
3258 params![action, scenario, unix_ms_now() as i64],
3259 )?;
3260 Ok(())
3261}
3262
3263fn unix_ms_now() -> u64 {
3264 SystemTime::now()
3265 .duration_since(UNIX_EPOCH)
3266 .unwrap_or_default()
3267 .as_millis() as u64
3268}
3269
3270fn write_response(
3271 stream: &mut TcpStream,
3272 status: u16,
3273 content_type: &str,
3274 body: &str,
3275) -> std::io::Result<()> {
3276 let status_text = match status {
3277 200 => "OK",
3278 400 => "Bad Request",
3279 401 => "Unauthorized",
3280 404 => "Not Found",
3281 405 => "Method Not Allowed",
3282 409 => "Conflict",
3283 507 => "Insufficient Storage",
3284 500 => "Internal Server Error",
3285 503 => "Service Unavailable",
3286 _ => "OK",
3287 };
3288
3289 let header = format!(
3290 "HTTP/1.1 {status} {status_text}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
3291 body.len()
3292 );
3293 stream.write_all(header.as_bytes())?;
3294 stream.write_all(body.as_bytes())?;
3295 stream.flush()?;
3296 Ok(())
3297}
3298
3299#[cfg(test)]
3300mod tests {
3301 use super::*;
3302 use crate::security::AuditLogger;
3303 use crate::{ChunkInput, RbacPolicy, RuntimeConfig};
3304 use serde_json::json;
3305 use tempfile::{NamedTempFile, tempdir};
3306
3307 fn make_request(
3308 method: &str,
3309 path: &str,
3310 body: Option<&str>,
3311 token: Option<&str>,
3312 ) -> HttpRequest {
3313 let mut headers = HashMap::new();
3314 let body_bytes = body.unwrap_or_default().as_bytes().to_vec();
3315 if !body_bytes.is_empty() {
3316 headers.insert("content-length".to_string(), body_bytes.len().to_string());
3317 headers.insert("content-type".to_string(), "application/json".to_string());
3318 }
3319 if let Some(value) = token {
3320 headers.insert("x-sqlrite-control-token".to_string(), value.to_string());
3321 }
3322
3323 HttpRequest {
3324 method: method.to_string(),
3325 path: path.to_string(),
3326 headers,
3327 body: body_bytes,
3328 }
3329 }
3330
3331 fn replication_primary_config() -> ServerConfig {
3332 let mut cfg = ServerConfig::default();
3333 cfg.ha_profile.replication.enabled = true;
3334 cfg.ha_profile.replication.role = ServerRole::Primary;
3335 cfg.ha_profile.replication.sync_ack_quorum = 2;
3336 cfg.ha_profile.replication.node_id = "node-a".to_string();
3337 cfg
3338 }
3339
3340 fn secure_server_config(audit_log_path: PathBuf) -> ServerConfig {
3341 ServerConfig {
3342 security: ServerSecurityConfig {
3343 secure_defaults: true,
3344 require_auth_context: true,
3345 policy: Some(RbacPolicy::default()),
3346 audit_log_path: Some(audit_log_path),
3347 ..ServerSecurityConfig::default()
3348 },
3349 ..ServerConfig::default()
3350 }
3351 }
3352
3353 #[test]
3354 fn parses_http_request_line() {
3355 assert_eq!(
3356 parse_http_request_line("GET /healthz HTTP/1.1\r\n"),
3357 Some(("GET", "/healthz"))
3358 );
3359 assert_eq!(parse_http_request_line(""), None);
3360 }
3361
3362 #[test]
3363 fn builds_health_response_with_ha_payload() -> Result<()> {
3364 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3365 db.ingest_chunk(&ChunkInput {
3366 id: "c1".to_string(),
3367 doc_id: "d1".to_string(),
3368 content: "health endpoint".to_string(),
3369 embedding: vec![1.0, 0.0],
3370 metadata: json!({"tenant": "acme"}),
3371 source: None,
3372 })?;
3373
3374 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3375 HaRuntimeProfile::default(),
3376 )));
3377 let request = make_request("GET", "/healthz", None, None);
3378 let (status, content_type, body) = build_response(
3379 &db,
3380 Path::new(":memory:"),
3381 DurabilityProfile::Balanced,
3382 &ServerConfig::default(),
3383 &state,
3384 &request,
3385 )?;
3386 assert_eq!(status, 200);
3387 assert_eq!(content_type, "application/json");
3388 assert!(body.contains("storage"));
3389 assert!(body.contains("ha"));
3390 Ok(())
3391 }
3392
3393 #[test]
3394 fn control_plane_requires_token_when_configured() -> Result<()> {
3395 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3396 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3397 HaRuntimeProfile::default(),
3398 )));
3399 let request = make_request("POST", "/control/v1/failover/start", Some("{}"), None);
3400 let config = ServerConfig {
3401 control_api_token: Some("secret".to_string()),
3402 ..ServerConfig::default()
3403 };
3404
3405 let (status, _content_type, body) = build_response(
3406 &db,
3407 Path::new(":memory:"),
3408 DurabilityProfile::Balanced,
3409 &config,
3410 &state,
3411 &request,
3412 )?;
3413 assert_eq!(status, 401);
3414 assert!(body.contains("unauthorized"));
3415 Ok(())
3416 }
3417
3418 #[test]
3419 fn sql_endpoint_rejects_non_post() -> Result<()> {
3420 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3421 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3422 HaRuntimeProfile::default(),
3423 )));
3424 let request = make_request("GET", "/v1/sql", None, None);
3425
3426 let (status, _content_type, body) = build_response(
3427 &db,
3428 Path::new(":memory:"),
3429 DurabilityProfile::Balanced,
3430 &ServerConfig::default(),
3431 &state,
3432 &request,
3433 )?;
3434 assert_eq!(status, 405);
3435 assert!(body.contains("method not allowed"));
3436 Ok(())
3437 }
3438
3439 #[test]
3440 fn sql_endpoint_reports_parse_error() -> Result<()> {
3441 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
3442 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
3443 db.ingest_chunk(&ChunkInput {
3444 id: "c1".to_string(),
3445 doc_id: "d1".to_string(),
3446 content: "agent memory".to_string(),
3447 embedding: vec![1.0, 0.0],
3448 metadata: json!({}),
3449 source: None,
3450 })?;
3451
3452 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3453 HaRuntimeProfile::default(),
3454 )));
3455 let request = make_request("POST", "/v1/sql", Some("{}"), None);
3456
3457 let (status, _content_type, body) = build_response(
3458 &db,
3459 db_file.path(),
3460 DurabilityProfile::Balanced,
3461 &ServerConfig::default(),
3462 &state,
3463 &request,
3464 )?;
3465 assert_eq!(status, 400);
3466 assert!(body.contains("statement"));
3467 Ok(())
3468 }
3469
3470 #[test]
3471 fn sql_endpoint_executes_search_statement() -> Result<()> {
3472 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
3473 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
3474 db.ingest_chunk(&ChunkInput {
3475 id: "search-sql-1".to_string(),
3476 doc_id: "doc-1".to_string(),
3477 content: "server sql search endpoint".to_string(),
3478 embedding: vec![1.0, 0.0],
3479 metadata: json!({"tenant": "demo"}),
3480 source: Some("docs/search-sql-1.md".to_string()),
3481 })?;
3482
3483 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3484 HaRuntimeProfile::default(),
3485 )));
3486 let body = json!({
3487 "statement": "SELECT chunk_id, doc_id, hybrid_score FROM SEARCH('server sql', vector('1,0'), 3, 0.65, 500, 'balanced', NULL, NULL) ORDER BY hybrid_score DESC, chunk_id ASC;"
3488 })
3489 .to_string();
3490 let request = make_request("POST", "/v1/sql", Some(&body), None);
3491
3492 let (status, _content_type, body) = build_response(
3493 &db,
3494 db_file.path(),
3495 DurabilityProfile::Balanced,
3496 &ServerConfig::default(),
3497 &state,
3498 &request,
3499 )?;
3500 assert_eq!(status, 200, "{body}");
3501 assert!(body.contains("\"chunk_id\":\"search-sql-1\""));
3502 Ok(())
3503 }
3504
3505 #[test]
3506 fn openapi_endpoint_exposes_query_and_grpc_paths() -> Result<()> {
3507 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3508 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3509 HaRuntimeProfile::default(),
3510 )));
3511 let request = make_request("GET", "/v1/openapi.json", None, None);
3512
3513 let (status, _content_type, body) = build_response(
3514 &db,
3515 Path::new(":memory:"),
3516 DurabilityProfile::Balanced,
3517 &ServerConfig::default(),
3518 &state,
3519 &request,
3520 )?;
3521 assert_eq!(status, 200);
3522 assert!(body.contains("\"/v1/query\""));
3523 assert!(body.contains("\"/v1/query-compact\""));
3524 assert!(body.contains("\"/grpc/sqlrite.v1.QueryService/Query\""));
3525 assert!(body.contains("\"/grpc/sqlrite.v1.QueryService/Sql\""));
3526 Ok(())
3527 }
3528
3529 #[test]
3530 fn openapi_endpoint_hides_sql_paths_when_sql_endpoint_disabled() -> Result<()> {
3531 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3532 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3533 HaRuntimeProfile::default(),
3534 )));
3535 let request = make_request("GET", "/v1/openapi.json", None, None);
3536 let config = ServerConfig {
3537 enable_sql_endpoint: false,
3538 ..ServerConfig::default()
3539 };
3540
3541 let (status, _content_type, body) = build_response(
3542 &db,
3543 Path::new(":memory:"),
3544 DurabilityProfile::Balanced,
3545 &config,
3546 &state,
3547 &request,
3548 )?;
3549 assert_eq!(status, 200);
3550 assert!(!body.contains("\"/v1/sql\""));
3551 assert!(!body.contains("\"/v1/query\""));
3552 assert!(!body.contains("\"/grpc/sqlrite.v1.QueryService/Sql\""));
3553 assert!(!body.contains("\"/grpc/sqlrite.v1.QueryService/Query\""));
3554 assert!(body.contains("\"/v1/openapi.json\""));
3555 Ok(())
3556 }
3557
3558 #[test]
3559 fn security_endpoint_reports_secure_defaults_and_roles() -> Result<()> {
3560 let tmp = tempdir()?;
3561 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3562 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3563 HaRuntimeProfile::default(),
3564 )));
3565 let config = secure_server_config(tmp.path().join("audit.jsonl"));
3566 let request = make_request("GET", "/control/v1/security", None, None);
3567
3568 let (status, _, body) = build_response(
3569 &db,
3570 Path::new(":memory:"),
3571 DurabilityProfile::Balanced,
3572 &config,
3573 &state,
3574 &request,
3575 )?;
3576 assert_eq!(status, 200);
3577 assert!(body.contains("\"enabled\":true"));
3578 assert!(body.contains("\"require_auth_context\":true"));
3579 assert!(body.contains("tenant_admin"));
3580 Ok(())
3581 }
3582
3583 #[test]
3584 fn security_audit_export_endpoint_writes_filtered_jsonl() -> Result<()> {
3585 let tmp = tempdir()?;
3586 let audit_path = tmp.path().join("audit.jsonl");
3587 let export_path = tmp.path().join("export.jsonl");
3588 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3589 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3590 HaRuntimeProfile::default(),
3591 )));
3592 let mut config = secure_server_config(audit_path.clone());
3593 config.control_api_token = Some("secret".to_string());
3594
3595 let logger = JsonlAuditLogger::new(&audit_path, Vec::<String>::new())?;
3596 logger.log(&AuditEvent {
3597 unix_ms: 10,
3598 actor_id: "reader-1".to_string(),
3599 tenant_id: "acme".to_string(),
3600 operation: AccessOperation::Query,
3601 allowed: true,
3602 detail: json!({"path":"/v1/query"}),
3603 })?;
3604 logger.log(&AuditEvent {
3605 unix_ms: 20,
3606 actor_id: "admin-1".to_string(),
3607 tenant_id: "acme".to_string(),
3608 operation: AccessOperation::SqlAdmin,
3609 allowed: false,
3610 detail: json!({"path":"/v1/sql"}),
3611 })?;
3612
3613 let request = make_request(
3614 "POST",
3615 "/control/v1/security/audit/export",
3616 Some(&format!(
3617 "{{\"actor_id\":\"reader-1\",\"output_path\":\"{}\",\"format\":\"jsonl\"}}",
3618 export_path.display()
3619 )),
3620 Some("secret"),
3621 );
3622 let (status, _, body) = build_response(
3623 &db,
3624 Path::new(":memory:"),
3625 DurabilityProfile::Balanced,
3626 &config,
3627 &state,
3628 &request,
3629 )?;
3630 assert_eq!(status, 200);
3631 assert!(body.contains("\"matched_events\":1"));
3632 let export = std::fs::read_to_string(export_path)?;
3633 assert!(export.contains("reader-1"));
3634 assert!(!export.contains("admin-1"));
3635 Ok(())
3636 }
3637
3638 #[test]
3639 fn secure_query_requires_auth_context_headers() -> Result<()> {
3640 let tmp = tempdir()?;
3641 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3642 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3643 HaRuntimeProfile::default(),
3644 )));
3645 let config = secure_server_config(tmp.path().join("audit.jsonl"));
3646 let request = make_request(
3647 "POST",
3648 "/v1/query",
3649 Some(r#"{"query_text":"agent","top_k":1}"#),
3650 None,
3651 );
3652
3653 let (status, _, body) = build_response(
3654 &db,
3655 Path::new(":memory:"),
3656 DurabilityProfile::Balanced,
3657 &config,
3658 &state,
3659 &request,
3660 )?;
3661 assert_eq!(status, 403);
3662 assert!(body.contains("missing auth context"));
3663 Ok(())
3664 }
3665
3666 #[test]
3667 fn secure_query_enforces_tenant_headers_and_audits() -> Result<()> {
3668 let tmp = tempdir()?;
3669 let audit_path = tmp.path().join("audit.jsonl");
3670 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3671 db.ingest_chunk(&ChunkInput {
3672 id: "tenant-1".to_string(),
3673 doc_id: "doc-1".to_string(),
3674 content: "tenant scoped agent memory".to_string(),
3675 embedding: vec![1.0, 0.0],
3676 metadata: json!({"tenant": "acme"}),
3677 source: None,
3678 })?;
3679 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3680 HaRuntimeProfile::default(),
3681 )));
3682 let config = secure_server_config(audit_path.clone());
3683
3684 let mut ok_request = make_request(
3685 "POST",
3686 "/v1/query",
3687 Some(r#"{"query_text":"agent","top_k":1}"#),
3688 None,
3689 );
3690 ok_request
3691 .headers
3692 .insert("x-sqlrite-actor-id".to_string(), "reader-1".to_string());
3693 ok_request
3694 .headers
3695 .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3696 ok_request
3697 .headers
3698 .insert("x-sqlrite-roles".to_string(), "reader".to_string());
3699
3700 let (status, _, body) = build_response(
3701 &db,
3702 Path::new(":memory:"),
3703 DurabilityProfile::Balanced,
3704 &config,
3705 &state,
3706 &ok_request,
3707 )?;
3708 assert_eq!(status, 200);
3709 assert!(body.contains("\"row_count\":1"));
3710
3711 let mut denied_request = make_request(
3712 "POST",
3713 "/v1/query",
3714 Some(r#"{"query_text":"agent","top_k":1,"metadata_filters":{"tenant":"beta"}}"#),
3715 None,
3716 );
3717 denied_request
3718 .headers
3719 .insert("x-sqlrite-actor-id".to_string(), "reader-1".to_string());
3720 denied_request
3721 .headers
3722 .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3723 denied_request
3724 .headers
3725 .insert("x-sqlrite-roles".to_string(), "reader".to_string());
3726
3727 let (status, _, body) = build_response(
3728 &db,
3729 Path::new(":memory:"),
3730 DurabilityProfile::Balanced,
3731 &config,
3732 &state,
3733 &denied_request,
3734 )?;
3735 assert_eq!(status, 403);
3736 assert!(body.contains("tenant filter mismatch"));
3737
3738 let audit = std::fs::read_to_string(audit_path)?;
3739 assert!(audit.contains("\"allowed\":true"));
3740 assert!(audit.contains("\"allowed\":false"));
3741 Ok(())
3742 }
3743
3744 #[test]
3745 fn rerank_hook_endpoint_returns_scored_candidates() -> Result<()> {
3746 let tmp = tempdir()?;
3747 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3748 db.ingest_chunk(&ChunkInput {
3749 id: "r1".to_string(),
3750 doc_id: "doc-1".to_string(),
3751 content: "rerank candidate agent memory".to_string(),
3752 embedding: vec![1.0, 0.0],
3753 metadata: json!({"tenant": "acme"}),
3754 source: None,
3755 })?;
3756 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3757 HaRuntimeProfile::default(),
3758 )));
3759 let config = secure_server_config(tmp.path().join("audit.jsonl"));
3760
3761 let mut request = make_request(
3762 "POST",
3763 "/v1/rerank-hook",
3764 Some(r#"{"query_text":"agent","candidate_count":5}"#),
3765 None,
3766 );
3767 request
3768 .headers
3769 .insert("x-sqlrite-actor-id".to_string(), "reader-1".to_string());
3770 request
3771 .headers
3772 .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3773 request
3774 .headers
3775 .insert("x-sqlrite-roles".to_string(), "reader".to_string());
3776
3777 let (status, _, body) = build_response(
3778 &db,
3779 Path::new(":memory:"),
3780 DurabilityProfile::Balanced,
3781 &config,
3782 &state,
3783 &request,
3784 )?;
3785 assert_eq!(status, 200);
3786 assert!(body.contains("\"kind\":\"rerank_hook\""));
3787 assert!(body.contains("\"vector_score\""));
3788 assert!(body.contains("\"text_score\""));
3789 Ok(())
3790 }
3791
3792 #[test]
3793 fn secure_sql_requires_admin_role() -> Result<()> {
3794 let tmp = tempdir()?;
3795 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
3796 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
3797 db.ingest_chunk(&ChunkInput {
3798 id: "sql-1".to_string(),
3799 doc_id: "doc-1".to_string(),
3800 content: "sql secure query".to_string(),
3801 embedding: vec![1.0, 0.0],
3802 metadata: json!({"tenant": "acme"}),
3803 source: None,
3804 })?;
3805 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3806 HaRuntimeProfile::default(),
3807 )));
3808 let config = secure_server_config(tmp.path().join("audit.jsonl"));
3809
3810 let mut reader_request = make_request(
3811 "POST",
3812 "/v1/sql",
3813 Some(r#"{"statement":"SELECT id FROM chunks ORDER BY id ASC LIMIT 1;"}"#),
3814 None,
3815 );
3816 reader_request
3817 .headers
3818 .insert("x-sqlrite-actor-id".to_string(), "reader-1".to_string());
3819 reader_request
3820 .headers
3821 .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3822 reader_request
3823 .headers
3824 .insert("x-sqlrite-roles".to_string(), "reader".to_string());
3825
3826 let (status, _, body) = build_response(
3827 &db,
3828 db_file.path(),
3829 DurabilityProfile::Balanced,
3830 &config,
3831 &state,
3832 &reader_request,
3833 )?;
3834 assert_eq!(status, 403);
3835 assert!(body.contains("authorization denied"));
3836
3837 let mut admin_request = make_request(
3838 "POST",
3839 "/v1/sql",
3840 Some(r#"{"statement":"SELECT id FROM chunks ORDER BY id ASC LIMIT 1;"}"#),
3841 None,
3842 );
3843 admin_request
3844 .headers
3845 .insert("x-sqlrite-actor-id".to_string(), "admin-1".to_string());
3846 admin_request
3847 .headers
3848 .insert("x-sqlrite-tenant-id".to_string(), "acme".to_string());
3849 admin_request
3850 .headers
3851 .insert("x-sqlrite-roles".to_string(), "admin".to_string());
3852
3853 let (status, _, body) = build_response(
3854 &db,
3855 db_file.path(),
3856 DurabilityProfile::Balanced,
3857 &config,
3858 &state,
3859 &admin_request,
3860 )?;
3861 assert_eq!(status, 200);
3862 assert!(body.contains("sql-1"));
3863 Ok(())
3864 }
3865
3866 #[test]
3867 fn query_and_grpc_query_endpoints_return_results() -> Result<()> {
3868 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3869 db.ingest_chunk(&ChunkInput {
3870 id: "query-1".to_string(),
3871 doc_id: "doc-1".to_string(),
3872 content: "agent query endpoint".to_string(),
3873 embedding: vec![1.0, 0.0],
3874 metadata: json!({"tenant": "demo"}),
3875 source: None,
3876 })?;
3877 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3878 HaRuntimeProfile::default(),
3879 )));
3880 let config = ServerConfig::default();
3881
3882 let query_req = make_request(
3883 "POST",
3884 "/v1/query",
3885 Some(r#"{"query_text":"agent","top_k":1}"#),
3886 None,
3887 );
3888 let (status, _, body) = build_response(
3889 &db,
3890 Path::new(":memory:"),
3891 DurabilityProfile::Balanced,
3892 &config,
3893 &state,
3894 &query_req,
3895 )?;
3896 assert_eq!(status, 200);
3897 assert!(body.contains("\"kind\":\"query\""));
3898 assert!(body.contains("\"row_count\":1"));
3899
3900 let compact_req = make_request(
3901 "POST",
3902 "/v1/query-compact",
3903 Some(r#"{"query_text":"agent","top_k":1,"include_payloads":false}"#),
3904 None,
3905 );
3906 let (status, _, body) = build_response(
3907 &db,
3908 Path::new(":memory:"),
3909 DurabilityProfile::Balanced,
3910 &config,
3911 &state,
3912 &compact_req,
3913 )?;
3914 assert_eq!(status, 200);
3915 assert!(body.contains("\"kind\":\"query_compact\""));
3916 assert!(body.contains("\"chunk_ids\":[\"query-1\"]"));
3917
3918 let grpc_query_req = make_request(
3919 "POST",
3920 "/grpc/sqlrite.v1.QueryService/Query",
3921 Some(r#"{"query_text":"agent","top_k":1}"#),
3922 None,
3923 );
3924 let (status, _, body) = build_response(
3925 &db,
3926 Path::new(":memory:"),
3927 DurabilityProfile::Balanced,
3928 &config,
3929 &state,
3930 &grpc_query_req,
3931 )?;
3932 assert_eq!(status, 200);
3933 assert!(body.contains("\"kind\":\"query\""));
3934 Ok(())
3935 }
3936
3937 #[test]
3938 fn query_and_grpc_endpoints_reject_non_post_methods() -> Result<()> {
3939 let db = SqlRite::open_in_memory_with_config(RuntimeConfig::default())?;
3940 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3941 HaRuntimeProfile::default(),
3942 )));
3943 let config = ServerConfig::default();
3944
3945 let query_get = make_request("GET", "/v1/query", None, None);
3946 let (status, _, body) = build_response(
3947 &db,
3948 Path::new(":memory:"),
3949 DurabilityProfile::Balanced,
3950 &config,
3951 &state,
3952 &query_get,
3953 )?;
3954 assert_eq!(status, 405);
3955 assert!(body.contains("POST /v1/query"));
3956
3957 let compact_get = make_request("GET", "/v1/query-compact", None, None);
3958 let (status, _, body) = build_response(
3959 &db,
3960 Path::new(":memory:"),
3961 DurabilityProfile::Balanced,
3962 &config,
3963 &state,
3964 &compact_get,
3965 )?;
3966 assert_eq!(status, 405);
3967 assert!(body.contains("POST /v1/query-compact"));
3968
3969 let grpc_sql_get = make_request("GET", "/grpc/sqlrite.v1.QueryService/Sql", None, None);
3970 let (status, _, body) = build_response(
3971 &db,
3972 Path::new(":memory:"),
3973 DurabilityProfile::Balanced,
3974 &config,
3975 &state,
3976 &grpc_sql_get,
3977 )?;
3978 assert_eq!(status, 405);
3979 assert!(body.contains("POST /grpc/sqlrite.v1.QueryService/Sql"));
3980 Ok(())
3981 }
3982
3983 #[test]
3984 fn grpc_sql_endpoint_executes_sql_statement() -> Result<()> {
3985 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
3986 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
3987 db.ingest_chunk(&ChunkInput {
3988 id: "grpc-sql-1".to_string(),
3989 doc_id: "doc-1".to_string(),
3990 content: "grpc sql endpoint".to_string(),
3991 embedding: vec![1.0, 0.0],
3992 metadata: json!({}),
3993 source: None,
3994 })?;
3995 let state = Arc::new(Mutex::new(ControlPlaneState::new(
3996 HaRuntimeProfile::default(),
3997 )));
3998 let config = ServerConfig::default();
3999
4000 let grpc_sql_req = make_request(
4001 "POST",
4002 "/grpc/sqlrite.v1.QueryService/Sql",
4003 Some(r#"{"statement":"SELECT id, doc_id FROM chunks ORDER BY id ASC LIMIT 1;"}"#),
4004 None,
4005 );
4006 let (status, _, body) = build_response(
4007 &db,
4008 db_file.path(),
4009 DurabilityProfile::Balanced,
4010 &config,
4011 &state,
4012 &grpc_sql_req,
4013 )?;
4014 assert_eq!(status, 200);
4015 assert!(body.contains("\"kind\":\"query\""));
4016 assert!(body.contains("\"row_count\":1"));
4017 assert!(body.contains("grpc-sql-1"));
4018 Ok(())
4019 }
4020
4021 #[test]
4022 fn replication_append_and_ack_advances_commit_index() -> Result<()> {
4023 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4024 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4025 let config = replication_primary_config();
4026 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4027 config.ha_profile.clone(),
4028 )));
4029
4030 let append = make_request(
4031 "POST",
4032 "/control/v1/replication/append",
4033 Some(r#"{"operation":"ingest_chunk","payload":{"id":"c1"}}"#),
4034 None,
4035 );
4036 let (status, _, body) = build_response(
4037 &db,
4038 db_file.path(),
4039 DurabilityProfile::Balanced,
4040 &config,
4041 &state,
4042 &append,
4043 )?;
4044 assert_eq!(status, 200);
4045 assert!(body.contains("commit_index"));
4046
4047 let ack = make_request(
4048 "POST",
4049 "/control/v1/replication/ack",
4050 Some(r#"{"node_id":"node-b","index":1}"#),
4051 None,
4052 );
4053 let (status, _, body) = build_response(
4054 &db,
4055 db_file.path(),
4056 DurabilityProfile::Balanced,
4057 &config,
4058 &state,
4059 &ack,
4060 )?;
4061 assert_eq!(status, 200);
4062 assert!(body.contains("\"commit_index\":1"));
4063 Ok(())
4064 }
4065
4066 #[test]
4067 fn election_vote_rejects_stale_term() -> Result<()> {
4068 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4069 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4070 let mut config = replication_primary_config();
4071 config.ha_profile.replication.role = ServerRole::Replica;
4072 config.ha_profile.replication.node_id = "node-b".to_string();
4073 let mut state_inner = ControlPlaneState::new(config.ha_profile.clone());
4074 state_inner.runtime.current_term = 5;
4075 state_inner.runtime.note_log_position(10, 5);
4076 let state = Arc::new(Mutex::new(state_inner));
4077
4078 let vote = make_request(
4079 "POST",
4080 "/control/v1/election/request-vote",
4081 Some(
4082 r#"{"term":4,"candidate_id":"node-a","candidate_last_log_index":10,"candidate_last_log_term":5}"#,
4083 ),
4084 None,
4085 );
4086
4087 let (status, _, body) = build_response(
4088 &db,
4089 db_file.path(),
4090 DurabilityProfile::Balanced,
4091 &config,
4092 &state,
4093 &vote,
4094 )?;
4095 assert_eq!(status, 200);
4096 assert!(body.contains("\"vote_granted\":false"));
4097 assert!(body.contains("stale_term"));
4098 Ok(())
4099 }
4100
4101 #[test]
4102 fn reconciliation_returns_missing_entries() -> Result<()> {
4103 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4104 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4105 let config = replication_primary_config();
4106 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4107 config.ha_profile.clone(),
4108 )));
4109
4110 for i in 1..=3 {
4111 let append = make_request(
4112 "POST",
4113 "/control/v1/replication/append",
4114 Some(&format!(
4115 "{{\"operation\":\"write\",\"payload\":{{\"n\":{i}}}}}"
4116 )),
4117 None,
4118 );
4119 let _ = build_response(
4120 &db,
4121 db_file.path(),
4122 DurabilityProfile::Balanced,
4123 &config,
4124 &state,
4125 &append,
4126 )?;
4127 }
4128
4129 let reconcile = make_request(
4130 "POST",
4131 "/control/v1/replication/reconcile",
4132 Some(r#"{"node_id":"node-c","last_applied_index":1}"#),
4133 None,
4134 );
4135 let (status, _, body) = build_response(
4136 &db,
4137 db_file.path(),
4138 DurabilityProfile::Balanced,
4139 &config,
4140 &state,
4141 &reconcile,
4142 )?;
4143 assert_eq!(status, 200);
4144 assert!(body.contains("missing_entries_count"));
4145 assert!(body.contains("\"index\":2"));
4146 Ok(())
4147 }
4148
4149 #[test]
4150 fn auto_failover_check_promotes_replica_when_forced() -> Result<()> {
4151 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4152 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4153 let mut config = replication_primary_config();
4154 config.ha_profile.replication.role = ServerRole::Replica;
4155 config.ha_profile.replication.node_id = "node-b".to_string();
4156 config.ha_profile.replication.failover_mode = FailoverMode::Automatic;
4157 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4158 config.ha_profile.clone(),
4159 )));
4160
4161 let check = make_request(
4162 "POST",
4163 "/control/v1/failover/auto-check",
4164 Some(r#"{"force":true,"reason":"test_auto"}"#),
4165 None,
4166 );
4167 let (status, _, body) = build_response(
4168 &db,
4169 db_file.path(),
4170 DurabilityProfile::Balanced,
4171 &config,
4172 &state,
4173 &check,
4174 )?;
4175 assert_eq!(status, 200);
4176 assert!(body.contains("\"triggered\":true"));
4177 assert!(body.contains("\"role\":\"primary\""));
4178 Ok(())
4179 }
4180
4181 #[test]
4182 fn auto_failover_check_accepts_missing_force_field() -> Result<()> {
4183 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4184 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4185 let mut config = replication_primary_config();
4186 config.ha_profile.replication.role = ServerRole::Replica;
4187 config.ha_profile.replication.node_id = "node-b".to_string();
4188 config.ha_profile.replication.failover_mode = FailoverMode::Automatic;
4189 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4190 config.ha_profile.clone(),
4191 )));
4192
4193 let check = make_request(
4194 "POST",
4195 "/control/v1/failover/auto-check",
4196 Some(r#"{"simulate_elapsed_ms":5000,"reason":"timeout-path"}"#),
4197 None,
4198 );
4199 let (status, _, body) = build_response(
4200 &db,
4201 db_file.path(),
4202 DurabilityProfile::Balanced,
4203 &config,
4204 &state,
4205 &check,
4206 )?;
4207 assert_eq!(status, 200);
4208 assert!(body.contains("\"triggered\":true"));
4209 assert!(body.contains("\"reason\":\"timeout-path\""));
4210 Ok(())
4211 }
4212
4213 #[test]
4214 fn chaos_disk_full_blocks_replication_append() -> Result<()> {
4215 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4216 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4217 let config = replication_primary_config();
4218 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4219 config.ha_profile.clone(),
4220 )));
4221
4222 let inject = make_request(
4223 "POST",
4224 "/control/v1/chaos/inject",
4225 Some(r#"{"scenario":"disk_full"}"#),
4226 None,
4227 );
4228 let (status, _, _) = build_response(
4229 &db,
4230 db_file.path(),
4231 DurabilityProfile::Balanced,
4232 &config,
4233 &state,
4234 &inject,
4235 )?;
4236 assert_eq!(status, 200);
4237
4238 let append = make_request(
4239 "POST",
4240 "/control/v1/replication/append",
4241 Some(r#"{"operation":"ingest_chunk","payload":{"id":"c1"}}"#),
4242 None,
4243 );
4244 let (status, _, body) = build_response(
4245 &db,
4246 db_file.path(),
4247 DurabilityProfile::Balanced,
4248 &config,
4249 &state,
4250 &append,
4251 )?;
4252 assert_eq!(status, 507);
4253 assert!(body.contains("disk_full"));
4254 Ok(())
4255 }
4256
4257 #[test]
4258 fn chaos_node_crash_blocks_health_but_not_status_endpoint() -> Result<()> {
4259 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4260 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4261 let config = replication_primary_config();
4262 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4263 config.ha_profile.clone(),
4264 )));
4265
4266 let inject = make_request(
4267 "POST",
4268 "/control/v1/chaos/inject",
4269 Some(r#"{"scenario":"node_crash"}"#),
4270 None,
4271 );
4272 let _ = build_response(
4273 &db,
4274 db_file.path(),
4275 DurabilityProfile::Balanced,
4276 &config,
4277 &state,
4278 &inject,
4279 )?;
4280
4281 let health = make_request("GET", "/readyz", None, None);
4282 let (status, _, body) = build_response(
4283 &db,
4284 db_file.path(),
4285 DurabilityProfile::Balanced,
4286 &config,
4287 &state,
4288 &health,
4289 )?;
4290 assert_eq!(status, 503);
4291 assert!(body.contains("node_crash"));
4292
4293 let status_req = make_request("GET", "/control/v1/chaos/status", None, None);
4294 let (status, _, body) = build_response(
4295 &db,
4296 db_file.path(),
4297 DurabilityProfile::Balanced,
4298 &config,
4299 &state,
4300 &status_req,
4301 )?;
4302 assert_eq!(status, 200);
4303 assert!(body.contains("active_fault_count"));
4304 Ok(())
4305 }
4306
4307 #[test]
4308 fn recovery_timing_is_recorded_between_start_and_mark_restored() -> Result<()> {
4309 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4310 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4311 let config = replication_primary_config();
4312 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4313 config.ha_profile.clone(),
4314 )));
4315
4316 let start = make_request("POST", "/control/v1/recovery/start", Some(r#"{}"#), None);
4317 let (status, _, _) = build_response(
4318 &db,
4319 db_file.path(),
4320 DurabilityProfile::Balanced,
4321 &config,
4322 &state,
4323 &start,
4324 )?;
4325 assert_eq!(status, 200);
4326
4327 let done = make_request(
4328 "POST",
4329 "/control/v1/recovery/mark-restored",
4330 Some(r#"{"note":"restore_done"}"#),
4331 None,
4332 );
4333 let (status, _, _) = build_response(
4334 &db,
4335 db_file.path(),
4336 DurabilityProfile::Balanced,
4337 &config,
4338 &state,
4339 &done,
4340 )?;
4341 assert_eq!(status, 200);
4342
4343 let resilience = make_request("GET", "/control/v1/resilience", None, None);
4344 let (status, _, body) = build_response(
4345 &db,
4346 db_file.path(),
4347 DurabilityProfile::Balanced,
4348 &config,
4349 &state,
4350 &resilience,
4351 )?;
4352 assert_eq!(status, 200);
4353 assert!(body.contains("restore_completed_total"));
4354 assert!(body.contains("last_restore_duration_ms"));
4355 Ok(())
4356 }
4357
4358 #[test]
4359 fn recovery_snapshot_verify_and_prune_endpoints_work() -> Result<()> {
4360 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4361 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4362 db.ingest_chunk(&ChunkInput {
4363 id: "c1".to_string(),
4364 doc_id: "d1".to_string(),
4365 content: "snapshot payload".to_string(),
4366 embedding: vec![1.0, 0.0],
4367 metadata: json!({}),
4368 source: None,
4369 })?;
4370 let backup_root = tempdir().map_err(std::io::Error::other)?;
4371 let mut config = replication_primary_config();
4372 config.ha_profile.recovery.backup_dir = backup_root.path().display().to_string();
4373 config.ha_profile.recovery.snapshot_interval_seconds = 1;
4374 config.ha_profile.recovery.pitr_retention_seconds = 60;
4375 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4376 config.ha_profile.clone(),
4377 )));
4378
4379 let snapshot_req = make_request(
4380 "POST",
4381 "/control/v1/recovery/snapshot",
4382 Some(r#"{"note":"s17_snapshot"}"#),
4383 None,
4384 );
4385 let (status, _, body) = build_response(
4386 &db,
4387 db_file.path(),
4388 DurabilityProfile::Balanced,
4389 &config,
4390 &state,
4391 &snapshot_req,
4392 )?;
4393 assert_eq!(status, 200);
4394 assert!(body.contains("snapshot_id"));
4395
4396 let list_req = make_request("GET", "/control/v1/recovery/snapshots?limit=10", None, None);
4397 let (status, _, body) = build_response(
4398 &db,
4399 db_file.path(),
4400 DurabilityProfile::Balanced,
4401 &config,
4402 &state,
4403 &list_req,
4404 )?;
4405 assert_eq!(status, 200);
4406 assert!(body.contains("\"count\":1"));
4407
4408 let verify_req = make_request(
4409 "POST",
4410 "/control/v1/recovery/verify-restore",
4411 Some(r#"{"keep_artifact":false}"#),
4412 None,
4413 );
4414 let (status, _, body) = build_response(
4415 &db,
4416 db_file.path(),
4417 DurabilityProfile::Balanced,
4418 &config,
4419 &state,
4420 &verify_req,
4421 )?;
4422 assert_eq!(status, 200);
4423 assert!(body.contains("\"restore_verified\":true"));
4424
4425 let prune_req = make_request(
4426 "POST",
4427 "/control/v1/recovery/prune-snapshots",
4428 Some(r#"{"retention_seconds":0}"#),
4429 None,
4430 );
4431 let (status, _, body) = build_response(
4432 &db,
4433 db_file.path(),
4434 DurabilityProfile::Balanced,
4435 &config,
4436 &state,
4437 &prune_req,
4438 )?;
4439 assert_eq!(status, 200);
4440 assert!(body.contains("removed_count"));
4441 Ok(())
4442 }
4443
4444 #[test]
4445 fn observability_metrics_and_trace_endpoints_report_sql_activity() -> Result<()> {
4446 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4447 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4448 db.ingest_chunk(&ChunkInput {
4449 id: "q1".to_string(),
4450 doc_id: "d1".to_string(),
4451 content: "query trace".to_string(),
4452 embedding: vec![1.0, 0.0],
4453 metadata: json!({}),
4454 source: None,
4455 })?;
4456 let config = ServerConfig::default();
4457 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4458 config.ha_profile.clone(),
4459 )));
4460
4461 let sql_req = make_request(
4462 "POST",
4463 "/v1/sql",
4464 Some(r#"{"statement":"SELECT id FROM chunks ORDER BY id ASC LIMIT 1;"}"#),
4465 None,
4466 );
4467 let started = unix_ms_now();
4468 let (status, _, _) = build_response(
4469 &db,
4470 db_file.path(),
4471 DurabilityProfile::Balanced,
4472 &config,
4473 &state,
4474 &sql_req,
4475 )?;
4476 let duration_ms = unix_ms_now().saturating_sub(started);
4477 {
4478 let mut guard = state
4479 .lock()
4480 .map_err(|_| std::io::Error::other("state lock poisoned"))?;
4481 guard
4482 .observability
4483 .record_request("POST", "/v1/sql", status, duration_ms);
4484 }
4485
4486 let trace_req = make_request("GET", "/control/v1/traces/recent?limit=5", None, None);
4487 let (status, _, body) = build_response(
4488 &db,
4489 db_file.path(),
4490 DurabilityProfile::Balanced,
4491 &config,
4492 &state,
4493 &trace_req,
4494 )?;
4495 assert_eq!(status, 200);
4496 assert!(body.contains("\"/v1/sql\""));
4497
4498 let map_req = make_request("GET", "/control/v1/observability/metrics-map", None, None);
4499 let (status, _, body) = build_response(
4500 &db,
4501 db_file.path(),
4502 DurabilityProfile::Balanced,
4503 &config,
4504 &state,
4505 &map_req,
4506 )?;
4507 assert_eq!(status, 200);
4508 assert!(body.contains("sqlrite_requests_sql_total"));
4509 Ok(())
4510 }
4511
4512 #[test]
4513 fn alert_simulation_and_slo_report_reflect_thresholds() -> Result<()> {
4514 let db_file = NamedTempFile::new().map_err(std::io::Error::other)?;
4515 let db = SqlRite::open_with_config(db_file.path(), RuntimeConfig::default())?;
4516 let mut config = replication_primary_config();
4517 config.ha_profile.replication.max_replication_lag_ms = 50;
4518 let state = Arc::new(Mutex::new(ControlPlaneState::new(
4519 config.ha_profile.clone(),
4520 )));
4521
4522 {
4523 let mut guard = state
4524 .lock()
4525 .map_err(|_| std::io::Error::other("state lock poisoned"))?;
4526 guard.runtime.replication_lag_ms = 120;
4527 guard.observability.record_request("GET", "/readyz", 200, 1);
4528 guard.observability.record_request("GET", "/readyz", 503, 1);
4529 }
4530
4531 let simulate_req = make_request(
4532 "POST",
4533 "/control/v1/alerts/simulate",
4534 Some(r#"{"sql_error_rate":0.20,"sql_avg_latency_ms":75.0,"replication_lag_ms":120}"#),
4535 None,
4536 );
4537 let (status, _, body) = build_response(
4538 &db,
4539 db_file.path(),
4540 DurabilityProfile::Balanced,
4541 &config,
4542 &state,
4543 &simulate_req,
4544 )?;
4545 assert_eq!(status, 200);
4546 assert!(body.contains("replication_lag_high"));
4547 assert!(body.contains("sql_error_rate_high"));
4548
4549 let slo_req = make_request("GET", "/control/v1/slo/report", None, None);
4550 let (status, _, body) = build_response(
4551 &db,
4552 db_file.path(),
4553 DurabilityProfile::Balanced,
4554 &config,
4555 &state,
4556 &slo_req,
4557 )?;
4558 assert_eq!(status, 200);
4559 assert!(body.contains("target_percent"));
4560 assert!(body.contains("target_seconds"));
4561
4562 let reset_req = make_request(
4563 "POST",
4564 "/control/v1/observability/reset",
4565 Some(r#"{}"#),
4566 None,
4567 );
4568 let (status, _, body) = build_response(
4569 &db,
4570 db_file.path(),
4571 DurabilityProfile::Balanced,
4572 &config,
4573 &state,
4574 &reset_req,
4575 )?;
4576 assert_eq!(status, 200);
4577 assert!(body.contains("\"requests_total\":0"));
4578 Ok(())
4579 }
4580}