1use crate::activity::{Activity, ActivityType};
11use crate::admin::{
12 AlertInfo, AlertSeverity, ClusterInfo, DashboardSummary, NodeInfo, QueryStats, StorageInfo,
13};
14use crate::auth::{LoginRequest, MfaVerifyRequest, UserInfo};
15use crate::state::{AppState, GraphEdge, GraphNode, KvEntry, QueryError, QueryResult};
16use aegis_document::{Document, DocumentId, Query as DocQuery, QueryResult as DocQueryResult};
17use aegis_streaming::{event::EventData, ChannelId, Event, EventType as StreamEventType};
18use aegis_timeseries::{DataPoint, Metric, MetricType, Tags, TimeSeriesQuery};
19use axum::{
20 extract::{Path, State},
21 http::StatusCode,
22 response::IntoResponse,
23 Json,
24};
25use chrono::{Duration, Utc};
26use serde::{Deserialize, Serialize};
27use std::time::Instant;
28
29#[derive(Debug, Serialize)]
35pub struct HealthResponse {
36 pub status: String,
37 pub version: String,
38}
39
40pub async fn health_check() -> Json<HealthResponse> {
42 Json(HealthResponse {
43 status: "healthy".to_string(),
44 version: env!("CARGO_PKG_VERSION").to_string(),
45 })
46}
47
48#[derive(Debug, Deserialize)]
54pub struct QueryRequest {
55 #[serde(default)]
57 pub database: Option<String>,
58 pub sql: String,
59 #[serde(default)]
60 pub params: Vec<serde_json::Value>,
61}
62
63#[derive(Debug, Serialize)]
65pub struct QueryResponse {
66 pub success: bool,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub data: Option<QueryResult>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub error: Option<String>,
71 pub execution_time_ms: u64,
72}
73
74pub async fn execute_query(
76 State(state): State<AppState>,
77 headers: axum::http::HeaderMap,
78 Json(request): Json<QueryRequest>,
79) -> impl IntoResponse {
80 let start = Instant::now();
81 let is_replicated = headers.get("x-aegis-replicated").is_some();
82
83 let result = if is_replicated {
84 state
86 .execute_query_replicated(&request.sql, request.database.as_deref())
87 .await
88 } else if !request.params.is_empty() {
89 state
90 .execute_query_with_params(&request.sql, request.database.as_deref(), &request.params)
91 .await
92 } else {
93 state
94 .execute_query(&request.sql, request.database.as_deref())
95 .await
96 };
97 let duration_ms = start.elapsed().as_millis() as u64;
98
99 match result {
100 Ok(data) => {
101 state.record_request(duration_ms, true).await;
102 (
103 StatusCode::OK,
104 Json(QueryResponse {
105 success: true,
106 data: Some(data),
107 error: None,
108 execution_time_ms: duration_ms,
109 }),
110 )
111 }
112 Err(e) => {
113 state.record_request(duration_ms, false).await;
114 let (status, client_msg) = match &e {
115 QueryError::Parse(_) => (StatusCode::BAD_REQUEST, "Query syntax error"),
116 QueryError::Plan(_) => (StatusCode::BAD_REQUEST, "Query planning error"),
117 QueryError::Execute(_) => {
118 (StatusCode::INTERNAL_SERVER_ERROR, "Query execution error")
119 }
120 };
121 tracing::warn!("Query failed: {}", e);
122 (
123 status,
124 Json(QueryResponse {
125 success: false,
126 data: None,
127 error: Some(client_msg.to_string()),
128 execution_time_ms: duration_ms,
129 }),
130 )
131 }
132 }
133}
134
135#[derive(Debug, Serialize)]
141pub struct TablesResponse {
142 pub tables: Vec<TableInfo>,
143}
144
145#[derive(Debug, Serialize)]
147pub struct TableInfo {
148 pub name: String,
149 pub columns: Vec<ColumnInfo>,
150 pub row_count: Option<u64>,
151}
152
153#[derive(Debug, Serialize)]
155pub struct ColumnInfo {
156 pub name: String,
157 pub data_type: String,
158 pub nullable: bool,
159}
160
161pub async fn list_tables(State(state): State<AppState>) -> Json<TablesResponse> {
163 let table_names = state.query_engine.list_tables(None);
164 let tables: Vec<TableInfo> = table_names
165 .into_iter()
166 .filter_map(|name| state.query_engine.get_table_info(&name, None))
167 .map(|info| TableInfo {
168 name: info.name,
169 columns: info
170 .columns
171 .into_iter()
172 .map(|c| ColumnInfo {
173 name: c.name,
174 data_type: c.data_type,
175 nullable: c.nullable,
176 })
177 .collect(),
178 row_count: info.row_count,
179 })
180 .collect();
181 Json(TablesResponse { tables })
182}
183
184pub async fn get_table(
186 State(state): State<AppState>,
187 Path(name): Path<String>,
188) -> impl IntoResponse {
189 match state.query_engine.get_table_info(&name, None) {
190 Some(info) => Json(TableInfo {
191 name: info.name,
192 columns: info
193 .columns
194 .into_iter()
195 .map(|c| ColumnInfo {
196 name: c.name,
197 data_type: c.data_type,
198 nullable: c.nullable,
199 })
200 .collect(),
201 row_count: info.row_count,
202 }),
203 None => Json(TableInfo {
204 name,
205 columns: vec![],
206 row_count: None,
207 }),
208 }
209}
210
211#[derive(Debug, Serialize)]
217pub struct MetricsResponse {
218 pub total_requests: u64,
219 pub failed_requests: u64,
220 pub avg_duration_ms: f64,
221 pub success_rate: f64,
222}
223
224pub async fn get_metrics(State(state): State<AppState>) -> Json<MetricsResponse> {
226 let metrics = state.metrics.read().await;
227 Json(MetricsResponse {
228 total_requests: metrics.total_requests,
229 failed_requests: metrics.failed_requests,
230 avg_duration_ms: metrics.avg_duration_ms(),
231 success_rate: metrics.success_rate(),
232 })
233}
234
235#[derive(Debug, Serialize)]
241pub struct ErrorResponse {
242 pub error: String,
243 pub code: String,
244}
245
246impl ErrorResponse {
247 pub fn new(error: impl ToString, code: impl ToString) -> Self {
248 Self {
249 error: error.to_string(),
250 code: code.to_string(),
251 }
252 }
253}
254
255pub async fn not_found() -> impl IntoResponse {
257 (
258 StatusCode::NOT_FOUND,
259 Json(ErrorResponse::new("Not found", "NOT_FOUND")),
260 )
261}
262
263pub async fn get_cluster_info(State(state): State<AppState>) -> Json<ClusterInfo> {
269 Json(state.admin.get_cluster_info())
270}
271
272pub async fn get_dashboard_summary(State(state): State<AppState>) -> Json<DashboardSummary> {
274 Json(state.admin.get_dashboard_summary())
275}
276
277pub async fn get_nodes(State(state): State<AppState>) -> Json<Vec<NodeInfo>> {
279 Json(state.admin.get_nodes())
280}
281
282#[derive(Debug, Deserialize)]
288pub struct JoinClusterRequest {
289 pub node_id: String,
290 pub node_name: Option<String>,
291 pub address: String,
292}
293
294#[derive(Debug, Serialize)]
296pub struct JoinClusterResponse {
297 pub success: bool,
298 pub message: String,
299 pub peers: Vec<PeerInfo>,
300}
301
302#[derive(Debug, Serialize)]
304pub struct PeerInfo {
305 pub id: String,
306 pub name: Option<String>,
307 pub address: String,
308}
309
310pub async fn get_node_info(State(state): State<AppState>) -> Json<crate::admin::PeerNode> {
312 Json(state.admin.get_self_info())
313}
314
315pub async fn cluster_join(
317 State(state): State<AppState>,
318 Json(req): Json<JoinClusterRequest>,
319) -> Json<JoinClusterResponse> {
320 use crate::admin::{NodeRole, NodeStatus, PeerNode};
321
322 let peer = PeerNode {
324 id: req.node_id.clone(),
325 name: req.node_name.clone(),
326 address: req.address.clone(),
327 status: NodeStatus::Online,
328 role: NodeRole::Follower,
329 last_seen: std::time::SystemTime::now()
330 .duration_since(std::time::UNIX_EPOCH)
331 .unwrap_or_default()
332 .as_millis() as u64,
333 version: env!("CARGO_PKG_VERSION").to_string(),
334 uptime_seconds: 0,
335 metrics: None,
336 };
337
338 state.admin.register_peer(peer);
339 state.admin.add_peer_address(req.address.clone());
340
341 tracing::info!(
342 "Node joined cluster: {} ({}) at {}",
343 req.node_id,
344 req.node_name.as_deref().unwrap_or("unnamed"),
345 req.address
346 );
347
348 let self_info = state.admin.get_self_info();
350 let mut peers = vec![PeerInfo {
351 id: self_info.id,
352 name: self_info.name,
353 address: self_info.address,
354 }];
355
356 for peer in state.admin.get_peers() {
357 if peer.id != req.node_id {
358 peers.push(PeerInfo {
359 id: peer.id,
360 name: peer.name,
361 address: peer.address,
362 });
363 }
364 }
365
366 Json(JoinClusterResponse {
367 success: true,
368 message: "Successfully joined cluster".to_string(),
369 peers,
370 })
371}
372
373#[derive(Debug, Deserialize)]
375pub struct HeartbeatRequest {
376 pub node_id: String,
377 pub node_name: Option<String>,
378 pub address: String,
379 pub uptime_seconds: u64,
380 pub metrics: Option<crate::admin::NodeMetrics>,
381}
382
383pub async fn cluster_heartbeat(
385 State(state): State<AppState>,
386 Json(req): Json<HeartbeatRequest>,
387) -> Json<serde_json::Value> {
388 use crate::admin::{NodeRole, NodeStatus, PeerNode};
389
390 let peer = PeerNode {
392 id: req.node_id.clone(),
393 name: req.node_name,
394 address: req.address,
395 status: NodeStatus::Online,
396 role: NodeRole::Follower,
397 last_seen: std::time::SystemTime::now()
398 .duration_since(std::time::UNIX_EPOCH)
399 .unwrap_or_default()
400 .as_millis() as u64,
401 version: env!("CARGO_PKG_VERSION").to_string(),
402 uptime_seconds: req.uptime_seconds,
403 metrics: req.metrics,
404 };
405
406 state.admin.register_peer(peer);
407
408 Json(serde_json::json!({
409 "success": true,
410 "timestamp": std::time::SystemTime::now()
411 .duration_since(std::time::UNIX_EPOCH)
412 .unwrap_or_default()
413 .as_millis()
414 }))
415}
416
417pub async fn get_peers(State(state): State<AppState>) -> Json<Vec<crate::admin::PeerNode>> {
419 Json(state.admin.get_peers())
420}
421
422pub async fn get_storage_info(State(state): State<AppState>) -> Json<StorageInfo> {
424 Json(state.admin.get_storage_info())
425}
426
427pub async fn get_query_stats(State(state): State<AppState>) -> Json<QueryStats> {
429 Json(state.admin.get_query_stats())
430}
431
432pub async fn get_database_stats(
435 State(state): State<AppState>,
436 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
437) -> Json<crate::state::DatabaseStats> {
438 let mut stats = state.get_database_stats();
440
441 if params.get("local").map(|v| v == "true").unwrap_or(false) {
443 return Json(stats);
444 }
445
446 let peers = state.admin.get_peers();
448 let client = reqwest::Client::new();
449
450 for peer in peers {
451 let url = format!("http://{}/api/v1/admin/database?local=true", peer.address);
452 if let Ok(response) = client
453 .get(&url)
454 .timeout(std::time::Duration::from_secs(2))
455 .send()
456 .await
457 {
458 if let Ok(peer_stats) = response.json::<crate::state::DatabaseStats>().await {
459 stats.total_keys += peer_stats.total_keys;
460 stats.total_documents += peer_stats.total_documents;
461 stats.collection_count += peer_stats.collection_count;
462 stats.documents_inserted += peer_stats.documents_inserted;
463 stats.documents_updated += peer_stats.documents_updated;
464 stats.documents_deleted += peer_stats.documents_deleted;
465 stats.queries_executed += peer_stats.queries_executed;
466 }
467 }
468 }
469
470 Json(stats)
471}
472
473#[derive(Debug, Serialize)]
475pub struct AlertsResponse {
476 pub alerts: Vec<AlertInfo>,
477}
478
479pub async fn get_alerts(State(_state): State<AppState>) -> Json<AlertsResponse> {
481 use sysinfo::{Disks, System};
482
483 let mut alerts = Vec::new();
484 let now = std::time::SystemTime::now()
485 .duration_since(std::time::UNIX_EPOCH)
486 .unwrap_or_default()
487 .as_millis() as u64;
488
489 let mut sys = System::new();
491 sys.refresh_memory();
492
493 let memory_total = sys.total_memory();
494 let memory_used = sys.used_memory();
495 if memory_total > 0 {
496 let memory_percent = (memory_used as f64 / memory_total as f64) * 100.0;
497 if memory_percent > 90.0 {
498 alerts.push(AlertInfo {
499 id: "mem-critical".to_string(),
500 severity: AlertSeverity::Critical,
501 source: "system".to_string(),
502 message: format!("Critical memory usage: {:.1}%", memory_percent),
503 timestamp: now,
504 acknowledged: false,
505 resolved: false,
506 });
507 } else if memory_percent > 80.0 {
508 alerts.push(AlertInfo {
509 id: "mem-warning".to_string(),
510 severity: AlertSeverity::Warning,
511 source: "system".to_string(),
512 message: format!("High memory usage: {:.1}%", memory_percent),
513 timestamp: now,
514 acknowledged: false,
515 resolved: false,
516 });
517 }
518 }
519
520 let disks = Disks::new_with_refreshed_list();
522 for disk in disks.list() {
523 let total = disk.total_space();
524 let available = disk.available_space();
525 if total > 0 {
526 let used_percent = ((total - available) as f64 / total as f64) * 100.0;
527 let mount = disk.mount_point().to_string_lossy();
528 if used_percent > 95.0 {
529 alerts.push(AlertInfo {
530 id: format!("disk-critical-{}", mount.replace("/", "_")),
531 severity: AlertSeverity::Critical,
532 source: "system".to_string(),
533 message: format!("Critical disk usage on {}: {:.1}%", mount, used_percent),
534 timestamp: now,
535 acknowledged: false,
536 resolved: false,
537 });
538 } else if used_percent > 85.0 {
539 alerts.push(AlertInfo {
540 id: format!("disk-warning-{}", mount.replace("/", "_")),
541 severity: AlertSeverity::Warning,
542 source: "system".to_string(),
543 message: format!("High disk usage on {}: {:.1}%", mount, used_percent),
544 timestamp: now,
545 acknowledged: false,
546 resolved: false,
547 });
548 }
549 }
550 }
551
552 Json(AlertsResponse { alerts })
553}
554
555pub async fn login(
561 State(state): State<AppState>,
562 Json(request): Json<LoginRequest>,
563) -> impl IntoResponse {
564 let response = state.auth.login(&request.username, &request.password);
565
566 if response.error.is_some() {
567 state.activity.log_auth(
568 &format!("Failed login attempt for user: {}", request.username),
569 Some(&request.username),
570 );
571 (StatusCode::UNAUTHORIZED, Json(response))
572 } else if response.requires_mfa == Some(true) {
573 state.activity.log_auth(
574 &format!("MFA required for user: {}", request.username),
575 Some(&request.username),
576 );
577 (StatusCode::OK, Json(response))
578 } else {
579 state.activity.log_auth(
580 &format!("User logged in: {}", request.username),
581 Some(&request.username),
582 );
583 (StatusCode::OK, Json(response))
584 }
585}
586
587pub async fn verify_mfa(
589 State(state): State<AppState>,
590 Json(request): Json<MfaVerifyRequest>,
591) -> impl IntoResponse {
592 let response = state.auth.verify_mfa(&request.code, &request.token);
593
594 if response.error.is_some() {
595 state.activity.log_auth("Failed MFA verification", None);
596 (StatusCode::UNAUTHORIZED, Json(response))
597 } else {
598 let username = response.user.as_ref().map(|u| u.username.as_str());
599 state.activity.log_auth(
600 &format!("MFA verified for user: {}", username.unwrap_or("unknown")),
601 username,
602 );
603 (StatusCode::OK, Json(response))
604 }
605}
606
607#[derive(Debug, Deserialize)]
609pub struct LogoutRequest {
610 pub token: String,
611}
612
613#[derive(Debug, Serialize)]
615pub struct LogoutResponse {
616 pub success: bool,
617}
618
619pub async fn logout(
621 State(state): State<AppState>,
622 Json(request): Json<LogoutRequest>,
623) -> Json<LogoutResponse> {
624 let success = state.auth.logout(&request.token);
625
626 if success {
627 state.activity.log_auth("User logged out", None);
628 }
629
630 Json(LogoutResponse { success })
631}
632
633pub async fn validate_session(
635 State(state): State<AppState>,
636 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
637) -> impl IntoResponse {
638 let token = params.get("token").map(|s| s.as_str()).unwrap_or("");
639
640 match state.auth.validate_session(token) {
641 Some(user) => {
642 let user_info: UserInfo = user;
643 (StatusCode::OK, Json(Some(user_info)))
644 }
645 None => (StatusCode::UNAUTHORIZED, Json(None::<UserInfo>)),
646 }
647}
648
649pub async fn get_current_user(
651 State(state): State<AppState>,
652 headers: axum::http::HeaderMap,
653) -> impl IntoResponse {
654 let auth_header = headers
655 .get("authorization")
656 .and_then(|v| v.to_str().ok())
657 .unwrap_or("");
658
659 let token = auth_header.strip_prefix("Bearer ").unwrap_or(auth_header);
660
661 match state.auth.validate_session(token) {
662 Some(user) => {
663 let user_info: UserInfo = user;
664 (StatusCode::OK, Json(Some(user_info)))
665 }
666 None => (StatusCode::UNAUTHORIZED, Json(None::<UserInfo>)),
667 }
668}
669
670#[derive(Debug, Deserialize)]
676pub struct ActivityQuery {
677 #[serde(default = "default_limit")]
678 pub limit: usize,
679 pub activity_type: Option<String>,
680 pub user: Option<String>,
681}
682
683fn default_limit() -> usize {
684 50
685}
686
687pub async fn get_activities(
689 State(state): State<AppState>,
690 axum::extract::Query(params): axum::extract::Query<ActivityQuery>,
691) -> Json<Vec<Activity>> {
692 let activities = if let Some(ref user) = params.user {
693 state.activity.get_by_user(user, params.limit)
694 } else if let Some(ref activity_type) = params.activity_type {
695 let at = match activity_type.as_str() {
696 "query" => ActivityType::Query,
697 "write" => ActivityType::Write,
698 "delete" => ActivityType::Delete,
699 "config" => ActivityType::Config,
700 "node" => ActivityType::Node,
701 "auth" => ActivityType::Auth,
702 _ => ActivityType::System,
703 };
704 state.activity.get_by_type(at, params.limit)
705 } else {
706 state.activity.get_recent(params.limit)
707 };
708
709 Json(activities)
710}
711
712#[derive(Debug, Serialize)]
718pub struct ListKeysResponse {
719 pub keys: Vec<KvEntry>,
720 pub total: usize,
721}
722
723pub async fn list_keys(
725 State(state): State<AppState>,
726 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
727) -> Json<ListKeysResponse> {
728 let limit = params
729 .get("limit")
730 .and_then(|s| s.parse().ok())
731 .unwrap_or(100);
732 let prefix = params.get("prefix").map(|s| s.as_str());
733
734 state.activity.log(ActivityType::Query, "Listed keys");
735
736 let keys = state.kv_store.list(prefix, limit);
737 let total = keys.len();
738
739 Json(ListKeysResponse { keys, total })
740}
741
742#[derive(Debug, Deserialize)]
744pub struct SetKeyRequest {
745 pub key: String,
746 pub value: serde_json::Value,
747 pub ttl: Option<u64>,
748}
749
750pub async fn set_key(
752 State(state): State<AppState>,
753 Json(request): Json<SetKeyRequest>,
754) -> Json<KvEntry> {
755 state
756 .activity
757 .log_write(&format!("Set key: {}", request.key), None);
758 let entry = state.kv_store.set(request.key, request.value, request.ttl);
759 Json(entry)
760}
761
762pub async fn get_key(State(state): State<AppState>, Path(key): Path<String>) -> impl IntoResponse {
764 match state.kv_store.get(&key) {
765 Some(entry) => (StatusCode::OK, Json(Some(entry))),
766 None => (StatusCode::NOT_FOUND, Json(None)),
767 }
768}
769
770pub async fn delete_key(
772 State(state): State<AppState>,
773 Path(key): Path<String>,
774) -> impl IntoResponse {
775 state
776 .activity
777 .log(ActivityType::Delete, &format!("Delete key: {}", key));
778 match state.kv_store.delete(&key) {
779 Some(_) => (
780 StatusCode::OK,
781 Json(serde_json::json!({"success": true, "key": key})),
782 ),
783 None => (
784 StatusCode::NOT_FOUND,
785 Json(serde_json::json!({"success": false, "error": "Key not found"})),
786 ),
787 }
788}
789
790#[derive(Debug, Serialize)]
796pub struct CollectionInfoResponse {
797 pub name: String,
798 pub document_count: usize,
799 pub index_count: usize,
800}
801
802pub async fn list_collections(State(state): State<AppState>) -> Json<Vec<CollectionInfoResponse>> {
804 state
805 .activity
806 .log(ActivityType::Query, "Listed collections");
807
808 let collection_names = state.document_engine.list_collections();
809 let collections: Vec<CollectionInfoResponse> = collection_names
810 .iter()
811 .filter_map(|name| {
812 state
813 .document_engine
814 .collection_stats(name)
815 .map(|stats| CollectionInfoResponse {
816 name: stats.name,
817 document_count: stats.document_count,
818 index_count: stats.index_count,
819 })
820 })
821 .collect();
822
823 Json(collections)
824}
825
826#[derive(Debug, Serialize)]
828pub struct DocumentResponse {
829 pub id: String,
830 pub collection: String,
831 pub data: serde_json::Value,
832}
833
834#[derive(Debug, Serialize)]
836pub struct CollectionQueryResponse {
837 pub documents: Vec<DocumentResponse>,
838 pub total_scanned: usize,
839 pub execution_time_ms: u64,
840}
841
842pub async fn get_collection_documents(
844 State(state): State<AppState>,
845 Path(collection): Path<String>,
846) -> impl IntoResponse {
847 state.activity.log(
848 ActivityType::Query,
849 &format!("Query collection: {}", collection),
850 );
851
852 let query = DocQuery::new();
854 match state.document_engine.find(&collection, &query) {
855 Ok(result) => {
856 let query_result: &DocQueryResult = &result;
858 let docs: Vec<DocumentResponse> = query_result
859 .documents
860 .iter()
861 .map(|doc| DocumentResponse {
862 id: doc.id.to_string(),
863 collection: collection.clone(),
864 data: doc_to_json(doc),
865 })
866 .collect();
867 let response = CollectionQueryResponse {
868 documents: docs,
869 total_scanned: query_result.total_scanned,
870 execution_time_ms: query_result.execution_time_ms,
871 };
872 (StatusCode::OK, Json(response))
873 }
874 Err(_e) => {
875 let empty = CollectionQueryResponse {
876 documents: vec![],
877 total_scanned: 0,
878 execution_time_ms: 0,
879 };
880 (StatusCode::NOT_FOUND, Json(empty))
881 }
882 }
883}
884
885pub async fn get_document(
887 State(state): State<AppState>,
888 Path((collection, id)): Path<(String, String)>,
889) -> impl IntoResponse {
890 state.activity.log(
891 ActivityType::Query,
892 &format!("Get document: {}/{}", collection, id),
893 );
894
895 let doc_id = DocumentId::new(&id);
896 match state.document_engine.get(&collection, &doc_id) {
897 Ok(Some(doc)) => {
898 let response = DocumentResponse {
899 id: doc.id.to_string(),
900 collection: collection.clone(),
901 data: doc_to_json(&doc),
902 };
903 (StatusCode::OK, Json(serde_json::json!(response)))
904 }
905 Ok(None) => (
906 StatusCode::NOT_FOUND,
907 Json(serde_json::json!({"error": "Document not found"})),
908 ),
909 Err(e) => (
910 StatusCode::BAD_REQUEST,
911 Json(serde_json::json!({"error": e.to_string()})),
912 ),
913 }
914}
915
916pub async fn delete_document(
918 State(state): State<AppState>,
919 Path((collection, id)): Path<(String, String)>,
920) -> impl IntoResponse {
921 state.activity.log(
922 ActivityType::Delete,
923 &format!("Delete document: {}/{}", collection, id),
924 );
925
926 let doc_id = DocumentId::new(&id);
927 match state.document_engine.delete(&collection, &doc_id) {
928 Ok(doc) => {
929 state.flush_collection(&collection);
930 let response = DocumentResponse {
931 id: doc.id.to_string(),
932 collection: collection.clone(),
933 data: doc_to_json(&doc),
934 };
935 (
936 StatusCode::OK,
937 Json(serde_json::json!({"success": true, "deleted": response})),
938 )
939 }
940 Err(e) => (
941 StatusCode::NOT_FOUND,
942 Json(serde_json::json!({"success": false, "error": e.to_string()})),
943 ),
944 }
945}
946
947#[derive(Debug, Deserialize)]
949pub struct UpdateDocumentRequest {
950 pub document: serde_json::Value,
951}
952
953pub async fn update_document(
955 State(state): State<AppState>,
956 Path((collection, id)): Path<(String, String)>,
957 Json(request): Json<UpdateDocumentRequest>,
958) -> impl IntoResponse {
959 state
960 .activity
961 .log_write(&format!("Update document: {}/{}", collection, id), None);
962
963 let doc_id = DocumentId::new(&id);
964
965 let mut doc = json_to_doc(request.document);
967 doc.id = doc_id.clone();
968
969 match state.document_engine.update(&collection, &doc_id, doc) {
970 Ok(()) => {
971 state.flush_collection(&collection);
972 match state.document_engine.get(&collection, &doc_id) {
974 Ok(Some(updated_doc)) => {
975 let response = DocumentResponse {
976 id: updated_doc.id.to_string(),
977 collection: collection.clone(),
978 data: doc_to_json(&updated_doc),
979 };
980 (
981 StatusCode::OK,
982 Json(serde_json::json!({"success": true, "document": response})),
983 )
984 }
985 _ => (
986 StatusCode::OK,
987 Json(serde_json::json!({"success": true, "id": id})),
988 ),
989 }
990 }
991 Err(e) => (
992 StatusCode::NOT_FOUND,
993 Json(serde_json::json!({"success": false, "error": e.to_string()})),
994 ),
995 }
996}
997
998pub async fn patch_document(
1000 State(state): State<AppState>,
1001 Path((collection, id)): Path<(String, String)>,
1002 Json(request): Json<UpdateDocumentRequest>,
1003) -> impl IntoResponse {
1004 state
1005 .activity
1006 .log_write(&format!("Patch document: {}/{}", collection, id), None);
1007
1008 let doc_id = DocumentId::new(&id);
1009
1010 let existing = match state.document_engine.get(&collection, &doc_id) {
1012 Ok(Some(doc)) => doc,
1013 Ok(None) => {
1014 return (
1015 StatusCode::NOT_FOUND,
1016 Json(serde_json::json!({"success": false, "error": "Document not found"})),
1017 );
1018 }
1019 Err(e) => {
1020 return (
1021 StatusCode::BAD_REQUEST,
1022 Json(serde_json::json!({"success": false, "error": e.to_string()})),
1023 );
1024 }
1025 };
1026
1027 let mut updated_doc = existing.clone();
1029 if let serde_json::Value::Object(patch_map) = request.document {
1030 for (key, value) in patch_map {
1031 updated_doc.set(&key, json_to_doc_value(value));
1032 }
1033 }
1034
1035 match state
1036 .document_engine
1037 .update(&collection, &doc_id, updated_doc)
1038 {
1039 Ok(()) => {
1040 state.flush_collection(&collection);
1041 match state.document_engine.get(&collection, &doc_id) {
1043 Ok(Some(final_doc)) => {
1044 let response = DocumentResponse {
1045 id: final_doc.id.to_string(),
1046 collection: collection.clone(),
1047 data: doc_to_json(&final_doc),
1048 };
1049 (
1050 StatusCode::OK,
1051 Json(serde_json::json!({"success": true, "document": response})),
1052 )
1053 }
1054 _ => (
1055 StatusCode::OK,
1056 Json(serde_json::json!({"success": true, "id": id})),
1057 ),
1058 }
1059 }
1060 Err(e) => (
1061 StatusCode::INTERNAL_SERVER_ERROR,
1062 Json(serde_json::json!({"success": false, "error": e.to_string()})),
1063 ),
1064 }
1065}
1066
1067#[derive(Debug, Deserialize)]
1069pub struct CreateCollectionRequest {
1070 pub name: String,
1071}
1072
1073pub async fn create_collection(
1075 State(state): State<AppState>,
1076 Json(request): Json<CreateCollectionRequest>,
1077) -> impl IntoResponse {
1078 state
1079 .activity
1080 .log_write(&format!("Create collection: {}", request.name), None);
1081
1082 match state.document_engine.create_collection(&request.name) {
1083 Ok(()) => (
1084 StatusCode::CREATED,
1085 Json(serde_json::json!({"success": true, "collection": request.name})),
1086 ),
1087 Err(e) => (
1088 StatusCode::BAD_REQUEST,
1089 Json(serde_json::json!({"success": false, "error": e.to_string()})),
1090 ),
1091 }
1092}
1093
1094#[derive(Debug, Deserialize)]
1096pub struct InsertDocumentRequest {
1097 pub id: Option<String>,
1099 pub document: serde_json::Value,
1100}
1101
1102pub async fn insert_document(
1104 State(state): State<AppState>,
1105 Path(collection): Path<String>,
1106 Json(request): Json<InsertDocumentRequest>,
1107) -> impl IntoResponse {
1108 state
1109 .activity
1110 .log_write(&format!("Insert document into: {}", collection), None);
1111
1112 let doc_json = if let Some(id) = request.id {
1114 let mut doc = request.document;
1115 if let serde_json::Value::Object(ref mut map) = doc {
1116 map.insert("_id".to_string(), serde_json::Value::String(id));
1117 }
1118 doc
1119 } else {
1120 request.document
1121 };
1122
1123 let doc = json_to_doc(doc_json);
1124 match state.document_engine.insert(&collection, doc) {
1125 Ok(id) => {
1126 state.flush_collection(&collection);
1127 (
1128 StatusCode::CREATED,
1129 Json(serde_json::json!({"success": true, "id": id.to_string()})),
1130 )
1131 }
1132 Err(e) => (
1133 StatusCode::BAD_REQUEST,
1134 Json(serde_json::json!({"success": false, "error": e.to_string()})),
1135 ),
1136 }
1137}
1138
1139fn doc_to_json(doc: &Document) -> serde_json::Value {
1141 let mut map = serde_json::Map::new();
1142 map.insert(
1143 "_id".to_string(),
1144 serde_json::Value::String(doc.id.to_string()),
1145 );
1146 for (key, value) in &doc.data {
1148 map.insert(key.clone(), aegis_doc_value_to_json(value));
1149 }
1150 serde_json::Value::Object(map)
1151}
1152
1153fn aegis_doc_value_to_json(value: &aegis_document::Value) -> serde_json::Value {
1155 match value {
1156 aegis_document::Value::Null => serde_json::Value::Null,
1157 aegis_document::Value::Bool(b) => serde_json::Value::Bool(*b),
1158 aegis_document::Value::Int(i) => serde_json::Value::Number((*i).into()),
1159 aegis_document::Value::Float(f) => serde_json::Number::from_f64(*f)
1160 .map(serde_json::Value::Number)
1161 .unwrap_or(serde_json::Value::Null),
1162 aegis_document::Value::String(s) => serde_json::Value::String(s.clone()),
1163 aegis_document::Value::Array(arr) => {
1164 serde_json::Value::Array(arr.iter().map(aegis_doc_value_to_json).collect())
1165 }
1166 aegis_document::Value::Object(obj) => {
1167 let map: serde_json::Map<String, serde_json::Value> = obj
1168 .iter()
1169 .map(|(k, v)| (k.clone(), aegis_doc_value_to_json(v)))
1170 .collect();
1171 serde_json::Value::Object(map)
1172 }
1173 }
1174}
1175
1176fn json_to_doc(json: serde_json::Value) -> Document {
1178 let doc_id = json
1181 .get("_id")
1182 .or_else(|| json.get("id"))
1183 .and_then(|v| v.as_str());
1184
1185 let mut doc = match doc_id {
1186 Some(id) => Document::with_id(id),
1187 None => Document::new(),
1188 };
1189
1190 if let serde_json::Value::Object(map) = json {
1191 for (key, value) in map {
1192 if key != "_id" {
1194 doc.set(&key, json_to_doc_value(value));
1195 }
1196 }
1197 }
1198 doc
1199}
1200
1201fn json_to_doc_value(json: serde_json::Value) -> aegis_document::Value {
1203 match json {
1204 serde_json::Value::Null => aegis_document::Value::Null,
1205 serde_json::Value::Bool(b) => aegis_document::Value::Bool(b),
1206 serde_json::Value::Number(n) => {
1207 if let Some(i) = n.as_i64() {
1208 aegis_document::Value::Int(i)
1209 } else if let Some(f) = n.as_f64() {
1210 aegis_document::Value::Float(f)
1211 } else {
1212 aegis_document::Value::Null
1213 }
1214 }
1215 serde_json::Value::String(s) => aegis_document::Value::String(s),
1216 serde_json::Value::Array(arr) => {
1217 aegis_document::Value::Array(arr.into_iter().map(json_to_doc_value).collect())
1218 }
1219 serde_json::Value::Object(map) => aegis_document::Value::Object(
1220 map.into_iter()
1221 .map(|(k, v)| (k, json_to_doc_value(v)))
1222 .collect(),
1223 ),
1224 }
1225}
1226
1227pub async fn list_collection_documents(
1229 State(state): State<AppState>,
1230 Path(collection): Path<String>,
1231 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1232) -> impl IntoResponse {
1233 state.activity.log(
1234 ActivityType::Query,
1235 &format!("List documents in: {}", collection),
1236 );
1237
1238 let limit = params.get("limit").and_then(|s| s.parse().ok());
1239 let skip = params.get("skip").and_then(|s| s.parse().ok());
1240
1241 let mut query = DocQuery::new();
1242 if let Some(limit) = limit {
1243 query = query.with_limit(limit);
1244 }
1245 if let Some(skip) = skip {
1246 query = query.with_skip(skip);
1247 }
1248
1249 match state.document_engine.find(&collection, &query) {
1250 Ok(result) => {
1251 let docs: Vec<DocumentResponse> = result
1252 .documents
1253 .iter()
1254 .map(|doc| DocumentResponse {
1255 id: doc.id.to_string(),
1256 collection: collection.clone(),
1257 data: doc_to_json(doc),
1258 })
1259 .collect();
1260 let response = CollectionQueryResponse {
1261 documents: docs,
1262 total_scanned: result.total_scanned,
1263 execution_time_ms: result.execution_time_ms,
1264 };
1265 (StatusCode::OK, Json(response))
1266 }
1267 Err(_e) => {
1268 let empty = CollectionQueryResponse {
1269 documents: vec![],
1270 total_scanned: 0,
1271 execution_time_ms: 0,
1272 };
1273 (StatusCode::NOT_FOUND, Json(empty))
1274 }
1275 }
1276}
1277
1278#[derive(Debug, Deserialize)]
1280pub struct DocumentQueryRequest {
1281 #[serde(default)]
1282 pub filter: serde_json::Value,
1283 pub limit: Option<usize>,
1284 pub skip: Option<usize>,
1285 pub sort: Option<SortSpec>,
1286}
1287
1288#[derive(Debug, Deserialize)]
1290pub struct SortSpec {
1291 pub field: String,
1292 #[serde(default = "default_ascending")]
1293 pub ascending: bool,
1294}
1295
1296fn default_ascending() -> bool {
1297 true
1298}
1299
1300pub async fn query_collection_documents(
1303 State(state): State<AppState>,
1304 Path(collection): Path<String>,
1305 Json(request): Json<DocumentQueryRequest>,
1306) -> impl IntoResponse {
1307 state.activity.log(
1308 ActivityType::Query,
1309 &format!("Query collection: {}", collection),
1310 );
1311
1312 let mut query = DocQuery::new();
1314
1315 if let serde_json::Value::Object(filter_map) = &request.filter {
1316 for (field, condition) in filter_map {
1317 if let Some(filter) = parse_filter_condition(field, condition) {
1318 query = query.with_filter(filter);
1319 }
1320 }
1321 }
1322
1323 if let Some(limit) = request.limit {
1324 query = query.with_limit(limit);
1325 }
1326 if let Some(skip) = request.skip {
1327 query = query.with_skip(skip);
1328 }
1329 if let Some(ref sort) = request.sort {
1330 query = query.with_sort(&sort.field, sort.ascending);
1331 }
1332
1333 match state.document_engine.find(&collection, &query) {
1334 Ok(result) => {
1335 let docs: Vec<DocumentResponse> = result
1336 .documents
1337 .iter()
1338 .map(|doc| DocumentResponse {
1339 id: doc.id.to_string(),
1340 collection: collection.clone(),
1341 data: doc_to_json(doc),
1342 })
1343 .collect();
1344 let response = CollectionQueryResponse {
1345 documents: docs,
1346 total_scanned: result.total_scanned,
1347 execution_time_ms: result.execution_time_ms,
1348 };
1349 (StatusCode::OK, Json(response))
1350 }
1351 Err(_) => {
1352 let empty = CollectionQueryResponse {
1353 documents: vec![],
1354 total_scanned: 0,
1355 execution_time_ms: 0,
1356 };
1357 (StatusCode::NOT_FOUND, Json(empty))
1358 }
1359 }
1360}
1361
1362fn parse_filter_condition(
1364 field: &str,
1365 condition: &serde_json::Value,
1366) -> Option<aegis_document::query::Filter> {
1367 use aegis_document::query::Filter;
1368
1369 match condition {
1370 serde_json::Value::Null
1372 | serde_json::Value::Bool(_)
1373 | serde_json::Value::Number(_)
1374 | serde_json::Value::String(_) => Some(Filter::Eq {
1375 field: field.to_string(),
1376 value: json_to_doc_value(condition.clone()),
1377 }),
1378 serde_json::Value::Object(ops) => {
1380 if field == "$and" {
1382 if let serde_json::Value::Array(arr) = condition {
1383 let filters: Vec<Filter> = arr
1384 .iter()
1385 .filter_map(|item| {
1386 if let serde_json::Value::Object(obj) = item {
1387 obj.iter()
1388 .filter_map(|(k, v)| parse_filter_condition(k, v))
1389 .next()
1390 } else {
1391 None
1392 }
1393 })
1394 .collect();
1395 return Some(Filter::And(filters));
1396 }
1397 return None;
1398 }
1399 if field == "$or" {
1400 if let serde_json::Value::Array(arr) = condition {
1401 let filters: Vec<Filter> = arr
1402 .iter()
1403 .filter_map(|item| {
1404 if let serde_json::Value::Object(obj) = item {
1405 obj.iter()
1406 .filter_map(|(k, v)| parse_filter_condition(k, v))
1407 .next()
1408 } else {
1409 None
1410 }
1411 })
1412 .collect();
1413 return Some(Filter::Or(filters));
1414 }
1415 return None;
1416 }
1417
1418 let mut filters: Vec<Filter> = Vec::new();
1420
1421 for (op, value) in ops {
1422 let filter = match op.as_str() {
1423 "$eq" => Some(Filter::Eq {
1424 field: field.to_string(),
1425 value: json_to_doc_value(value.clone()),
1426 }),
1427 "$ne" => Some(Filter::Ne {
1428 field: field.to_string(),
1429 value: json_to_doc_value(value.clone()),
1430 }),
1431 "$gt" => Some(Filter::Gt {
1432 field: field.to_string(),
1433 value: json_to_doc_value(value.clone()),
1434 }),
1435 "$gte" => Some(Filter::Gte {
1436 field: field.to_string(),
1437 value: json_to_doc_value(value.clone()),
1438 }),
1439 "$lt" => Some(Filter::Lt {
1440 field: field.to_string(),
1441 value: json_to_doc_value(value.clone()),
1442 }),
1443 "$lte" => Some(Filter::Lte {
1444 field: field.to_string(),
1445 value: json_to_doc_value(value.clone()),
1446 }),
1447 "$in" => {
1448 if let serde_json::Value::Array(arr) = value {
1449 Some(Filter::In {
1450 field: field.to_string(),
1451 values: arr.iter().map(|v| json_to_doc_value(v.clone())).collect(),
1452 })
1453 } else {
1454 None
1455 }
1456 }
1457 "$nin" => {
1458 if let serde_json::Value::Array(arr) = value {
1459 Some(Filter::Nin {
1460 field: field.to_string(),
1461 values: arr.iter().map(|v| json_to_doc_value(v.clone())).collect(),
1462 })
1463 } else {
1464 None
1465 }
1466 }
1467 "$exists" => {
1468 if let serde_json::Value::Bool(b) = value {
1469 Some(Filter::Exists {
1470 field: field.to_string(),
1471 exists: *b,
1472 })
1473 } else {
1474 None
1475 }
1476 }
1477 "$regex" => {
1478 if let serde_json::Value::String(pattern) = value {
1479 Some(Filter::Regex {
1480 field: field.to_string(),
1481 pattern: pattern.clone(),
1482 })
1483 } else {
1484 None
1485 }
1486 }
1487 "$contains" => {
1488 if let serde_json::Value::String(s) = value {
1489 Some(Filter::Contains {
1490 field: field.to_string(),
1491 value: s.clone(),
1492 })
1493 } else {
1494 None
1495 }
1496 }
1497 "$startsWith" => {
1498 if let serde_json::Value::String(s) = value {
1499 Some(Filter::StartsWith {
1500 field: field.to_string(),
1501 value: s.clone(),
1502 })
1503 } else {
1504 None
1505 }
1506 }
1507 "$endsWith" => {
1508 if let serde_json::Value::String(s) = value {
1509 Some(Filter::EndsWith {
1510 field: field.to_string(),
1511 value: s.clone(),
1512 })
1513 } else {
1514 None
1515 }
1516 }
1517 _ => None,
1518 };
1519
1520 if let Some(f) = filter {
1521 filters.push(f);
1522 }
1523 }
1524
1525 match filters.len() {
1527 0 => None,
1528 1 => filters.into_iter().next(),
1529 _ => Some(Filter::And(filters)),
1530 }
1531 }
1532 serde_json::Value::Array(_) => None,
1533 }
1534}
1535
1536#[derive(Debug, Deserialize)]
1542pub struct RegisterMetricRequest {
1543 pub name: String,
1544 #[serde(default = "default_metric_type")]
1545 pub metric_type: String,
1546 pub description: Option<String>,
1547 pub unit: Option<String>,
1548}
1549
1550fn default_metric_type() -> String {
1551 "gauge".to_string()
1552}
1553
1554pub async fn register_metric(
1556 State(state): State<AppState>,
1557 Json(request): Json<RegisterMetricRequest>,
1558) -> impl IntoResponse {
1559 state
1560 .activity
1561 .log_write(&format!("Register metric: {}", request.name), None);
1562
1563 let metric_type = match request.metric_type.to_lowercase().as_str() {
1564 "counter" => MetricType::Counter,
1565 "gauge" => MetricType::Gauge,
1566 "histogram" => MetricType::Histogram,
1567 "summary" => MetricType::Summary,
1568 _ => MetricType::Gauge,
1569 };
1570
1571 let mut metric = Metric::new(&request.name);
1572 metric.metric_type = metric_type;
1573 metric.description = request.description;
1574 metric.unit = request.unit;
1575
1576 match state.timeseries_engine.register_metric(metric) {
1577 Ok(()) => (
1578 StatusCode::CREATED,
1579 Json(serde_json::json!({
1580 "success": true,
1581 "metric": request.name
1582 })),
1583 ),
1584 Err(e) => (
1585 StatusCode::BAD_REQUEST,
1586 Json(serde_json::json!({
1587 "success": false,
1588 "error": e.to_string()
1589 })),
1590 ),
1591 }
1592}
1593
1594#[derive(Debug, Deserialize)]
1596pub struct WriteTimeSeriesRequest {
1597 pub metric: String,
1598 #[serde(default)]
1599 pub tags: std::collections::HashMap<String, String>,
1600 pub value: f64,
1601 pub timestamp: Option<i64>,
1602}
1603
1604pub async fn write_timeseries(
1606 State(state): State<AppState>,
1607 Json(request): Json<WriteTimeSeriesRequest>,
1608) -> impl IntoResponse {
1609 state
1610 .activity
1611 .log_write(&format!("Write timeseries: {}", request.metric), None);
1612
1613 let mut tags = Tags::new();
1614 for (k, v) in request.tags {
1615 tags.insert(&k, &v);
1616 }
1617
1618 let point = if let Some(ts) = request.timestamp {
1619 DataPoint {
1620 timestamp: chrono::DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now),
1621 value: request.value,
1622 }
1623 } else {
1624 DataPoint {
1625 timestamp: Utc::now(),
1626 value: request.value,
1627 }
1628 };
1629
1630 match state.timeseries_engine.write(&request.metric, tags, point) {
1631 Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
1632 Err(e) => (
1633 StatusCode::BAD_REQUEST,
1634 Json(serde_json::json!({"success": false, "error": e.to_string()})),
1635 ),
1636 }
1637}
1638
1639#[derive(Debug, Deserialize)]
1641pub struct QueryTimeSeriesRequest {
1642 pub metric: String,
1643 #[serde(default)]
1644 pub tags: Option<std::collections::HashMap<String, String>>,
1645 pub start: Option<i64>,
1646 pub end: Option<i64>,
1647 pub limit: Option<usize>,
1648}
1649
1650#[derive(Debug, Serialize)]
1652pub struct TimeSeriesResponse {
1653 pub metric: String,
1654 pub series: Vec<SeriesResponse>,
1655 pub points_returned: usize,
1656 pub query_time_ms: u64,
1657}
1658
1659#[derive(Debug, Serialize)]
1660pub struct SeriesResponse {
1661 pub tags: std::collections::HashMap<String, String>,
1662 pub points: Vec<PointResponse>,
1663}
1664
1665#[derive(Debug, Serialize)]
1666pub struct PointResponse {
1667 pub timestamp: i64,
1668 pub value: f64,
1669}
1670
1671pub async fn query_timeseries(
1673 State(state): State<AppState>,
1674 Json(request): Json<QueryTimeSeriesRequest>,
1675) -> impl IntoResponse {
1676 state.activity.log(
1677 ActivityType::Query,
1678 &format!("Query timeseries: {}", request.metric),
1679 );
1680
1681 let duration = Duration::hours(24); let mut query = TimeSeriesQuery::last(&request.metric, duration);
1683
1684 if let Some(limit) = request.limit {
1685 query = query.with_limit(limit);
1686 }
1687
1688 if let Some(ref tags_map) = request.tags {
1689 let mut tags = Tags::new();
1690 for (k, v) in tags_map {
1691 tags.insert(k, v);
1692 }
1693 query = query.with_tags(tags);
1694 }
1695
1696 let result = state.timeseries_engine.query(&query);
1697
1698 let series: Vec<SeriesResponse> = result
1699 .series
1700 .iter()
1701 .map(|s| SeriesResponse {
1702 tags: s.tags.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
1703 points: s
1704 .points
1705 .iter()
1706 .map(|p| PointResponse {
1707 timestamp: p.timestamp.timestamp(),
1708 value: p.value,
1709 })
1710 .collect(),
1711 })
1712 .collect();
1713
1714 let response = TimeSeriesResponse {
1715 metric: request.metric,
1716 series,
1717 points_returned: result.points_returned,
1718 query_time_ms: result.query_time_ms,
1719 };
1720
1721 (StatusCode::OK, Json(response))
1722}
1723
1724#[derive(Debug, Serialize)]
1726pub struct MetricInfoResponse {
1727 pub name: String,
1728 pub metric_type: String,
1729 pub description: Option<String>,
1730 pub unit: Option<String>,
1731}
1732
1733impl From<&Metric> for MetricInfoResponse {
1734 fn from(m: &Metric) -> Self {
1735 Self {
1736 name: m.name.clone(),
1737 metric_type: match m.metric_type {
1738 MetricType::Counter => "counter".to_string(),
1739 MetricType::Gauge => "gauge".to_string(),
1740 MetricType::Histogram => "histogram".to_string(),
1741 MetricType::Summary => "summary".to_string(),
1742 },
1743 description: m.description.clone(),
1744 unit: m.unit.clone(),
1745 }
1746 }
1747}
1748
1749pub async fn list_metrics(State(state): State<AppState>) -> Json<Vec<MetricInfoResponse>> {
1751 state.activity.log(ActivityType::Query, "Listed metrics");
1752 let metrics = state.timeseries_engine.list_metrics();
1753 Json(metrics.iter().map(MetricInfoResponse::from).collect())
1754}
1755
1756#[derive(Debug, Deserialize)]
1762pub struct CreateChannelRequest {
1763 pub id: String,
1764}
1765
1766pub async fn create_channel(
1768 State(state): State<AppState>,
1769 Json(request): Json<CreateChannelRequest>,
1770) -> impl IntoResponse {
1771 state
1772 .activity
1773 .log_write(&format!("Create channel: {}", request.id), None);
1774
1775 match state.streaming_engine.create_channel(request.id.clone()) {
1776 Ok(()) => (
1777 StatusCode::CREATED,
1778 Json(serde_json::json!({"success": true, "channel": request.id})),
1779 ),
1780 Err(e) => (
1781 StatusCode::BAD_REQUEST,
1782 Json(serde_json::json!({"success": false, "error": e.to_string()})),
1783 ),
1784 }
1785}
1786
1787pub async fn list_channels(State(state): State<AppState>) -> Json<Vec<String>> {
1789 state.activity.log(ActivityType::Query, "Listed channels");
1790 let channels: Vec<String> = state
1791 .streaming_engine
1792 .list_channels()
1793 .into_iter()
1794 .map(|c| c.to_string())
1795 .collect();
1796 Json(channels)
1797}
1798
1799#[derive(Debug, Deserialize)]
1801pub struct PublishEventRequest {
1802 pub channel: String,
1803 pub event_type: String,
1804 pub source: String,
1805 pub data: serde_json::Value,
1806}
1807
1808pub async fn publish_event(
1810 State(state): State<AppState>,
1811 Json(request): Json<PublishEventRequest>,
1812) -> impl IntoResponse {
1813 state
1814 .activity
1815 .log_write(&format!("Publish to channel: {}", request.channel), None);
1816
1817 let event_type = match request.event_type.as_str() {
1818 "created" => StreamEventType::Created,
1819 "updated" => StreamEventType::Updated,
1820 "deleted" => StreamEventType::Deleted,
1821 _ => StreamEventType::Custom(request.event_type.clone()),
1822 };
1823
1824 let data = match request.data {
1825 serde_json::Value::String(s) => EventData::String(s),
1826 serde_json::Value::Number(n) => {
1827 if let Some(i) = n.as_i64() {
1828 EventData::Int(i)
1829 } else if let Some(f) = n.as_f64() {
1830 EventData::Float(f)
1831 } else {
1832 EventData::Null
1833 }
1834 }
1835 serde_json::Value::Bool(b) => EventData::Bool(b),
1836 serde_json::Value::Null => EventData::Null,
1837 _ => EventData::Json(request.data.clone()),
1838 };
1839
1840 let event = Event::new(event_type, &request.source, data);
1841 let channel_id = ChannelId::new(&request.channel);
1842
1843 match state.streaming_engine.publish(&channel_id, event) {
1844 Ok(receivers) => (
1845 StatusCode::OK,
1846 Json(serde_json::json!({"success": true, "receivers": receivers})),
1847 ),
1848 Err(e) => (
1849 StatusCode::BAD_REQUEST,
1850 Json(serde_json::json!({"success": false, "error": e.to_string()})),
1851 ),
1852 }
1853}
1854
1855pub async fn get_channel_history(
1857 State(state): State<AppState>,
1858 Path(channel): Path<String>,
1859 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1860) -> impl IntoResponse {
1861 let count = params
1862 .get("count")
1863 .and_then(|s| s.parse().ok())
1864 .unwrap_or(100);
1865 let channel_id = ChannelId::new(&channel);
1866
1867 match state.streaming_engine.get_history(&channel_id, count) {
1868 Ok(events) => {
1869 let event_data: Vec<serde_json::Value> = events
1870 .iter()
1871 .map(|e| {
1872 serde_json::json!({
1873 "id": e.id.to_string(),
1874 "event_type": format!("{:?}", e.event_type),
1875 "source": e.source,
1876 "timestamp": e.timestamp,
1877 })
1878 })
1879 .collect();
1880 (
1881 StatusCode::OK,
1882 Json(serde_json::json!({"events": event_data})),
1883 )
1884 }
1885 Err(e) => (
1886 StatusCode::NOT_FOUND,
1887 Json(serde_json::json!({"error": e.to_string()})),
1888 ),
1889 }
1890}
1891
1892#[derive(Debug, Serialize)]
1898pub struct GraphDataResponse {
1899 pub nodes: Vec<GraphNode>,
1900 pub edges: Vec<GraphEdge>,
1901}
1902
1903pub async fn get_graph_data(State(state): State<AppState>) -> Json<GraphDataResponse> {
1905 state.activity.log(ActivityType::Query, "Query graph data");
1906
1907 let (nodes, edges) = state.graph_store.get_all();
1908
1909 Json(GraphDataResponse { nodes, edges })
1910}
1911
1912#[derive(Debug, Deserialize)]
1918pub struct ExecuteQueryRequest {
1919 pub query: String,
1920 #[serde(default)]
1921 pub database: Option<String>,
1922}
1923
1924#[derive(Debug, Serialize)]
1926pub struct ExecuteQueryResponse {
1927 pub success: bool,
1928 pub columns: Vec<String>,
1929 pub rows: Vec<Vec<serde_json::Value>>,
1930 pub row_count: usize,
1931 pub execution_time_ms: u64,
1932 pub error: Option<String>,
1933}
1934
1935pub async fn execute_builder_query(
1937 State(state): State<AppState>,
1938 Json(request): Json<ExecuteQueryRequest>,
1939) -> Json<ExecuteQueryResponse> {
1940 let start = std::time::Instant::now();
1941 state.activity.log_query(&request.query, 0, None);
1942
1943 match state
1945 .query_engine
1946 .execute(&request.query, request.database.as_deref())
1947 {
1948 Ok(result) => Json(ExecuteQueryResponse {
1949 success: true,
1950 columns: result.columns,
1951 rows: result.rows,
1952 row_count: result.rows_affected as usize,
1953 execution_time_ms: start.elapsed().as_millis() as u64,
1954 error: None,
1955 }),
1956 Err(e) => Json(ExecuteQueryResponse {
1957 success: false,
1958 columns: vec![],
1959 rows: vec![],
1960 row_count: 0,
1961 execution_time_ms: start.elapsed().as_millis() as u64,
1962 error: Some(e.to_string()),
1963 }),
1964 }
1965}
1966
1967#[derive(Debug, Serialize)]
1973pub struct NodeActionResponse {
1974 pub success: bool,
1975 pub message: String,
1976 pub node_id: String,
1977}
1978
1979pub async fn restart_node(
1982 State(state): State<AppState>,
1983 Path(node_id): Path<String>,
1984) -> Json<NodeActionResponse> {
1985 state
1986 .activity
1987 .log_node(&format!("Restarting node: {}", node_id));
1988
1989 let peers = state.admin.get_peers();
1991 let peer = peers
1992 .iter()
1993 .find(|p| p.id == node_id || p.name.as_deref() == Some(&node_id));
1994
1995 if let Some(peer) = peer {
1996 let address = peer.address.clone();
1997 let client = reqwest::Client::builder()
1999 .timeout(std::time::Duration::from_secs(5))
2000 .build()
2001 .unwrap_or_default();
2002 let url = format!("{}/api/v1/cluster/shutdown", address);
2003 tokio::spawn(async move {
2004 let _ = client.post(&url).send().await;
2005 });
2006
2007 Json(NodeActionResponse {
2008 success: true,
2009 message: format!(
2010 "Node {} restart initiated at {}. PM2 will auto-restart.",
2011 node_id, address
2012 ),
2013 node_id,
2014 })
2015 } else {
2016 Json(NodeActionResponse {
2017 success: false,
2018 message: format!("Node {} not found in cluster peers.", node_id),
2019 node_id,
2020 })
2021 }
2022}
2023
2024pub async fn drain_node(
2026 State(state): State<AppState>,
2027 Path(node_id): Path<String>,
2028) -> Json<NodeActionResponse> {
2029 state
2030 .activity
2031 .log_node(&format!("Draining node: {}", node_id));
2032
2033 let peers = state.admin.get_peers();
2035 let found = peers
2036 .iter()
2037 .any(|p| p.id == node_id || p.name.as_deref() == Some(&node_id));
2038
2039 if found {
2040 state.admin.mark_peer_offline(&node_id);
2042
2043 Json(NodeActionResponse {
2044 success: true,
2045 message: format!(
2046 "Node {} marked as draining. Traffic will be redirected to other nodes.",
2047 node_id
2048 ),
2049 node_id,
2050 })
2051 } else {
2052 Json(NodeActionResponse {
2053 success: false,
2054 message: format!("Node {} not found in cluster peers.", node_id),
2055 node_id,
2056 })
2057 }
2058}
2059
2060pub async fn remove_node(
2062 State(state): State<AppState>,
2063 Path(node_id): Path<String>,
2064) -> impl IntoResponse {
2065 state
2066 .activity
2067 .log_node(&format!("Removing node from cluster: {}", node_id));
2068
2069 state.admin.remove_peer(&node_id);
2071
2072 (
2073 StatusCode::OK,
2074 Json(NodeActionResponse {
2075 success: true,
2076 message: format!("Node {} has been removed from the cluster.", node_id),
2077 node_id,
2078 }),
2079 )
2080}
2081
2082pub async fn cluster_shutdown(State(state): State<AppState>) -> impl IntoResponse {
2085 state
2086 .activity
2087 .log_node("Graceful shutdown initiated via cluster API");
2088
2089 state.timeseries_engine.flush();
2091
2092 tokio::spawn(async {
2094 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2095 std::process::exit(0);
2096 });
2097
2098 (
2099 StatusCode::OK,
2100 Json(serde_json::json!({
2101 "status": "shutting_down",
2102 "message": "Node will restart via PM2"
2103 })),
2104 )
2105}
2106
2107#[derive(Debug, Serialize)]
2109pub struct NodeLogEntry {
2110 pub timestamp: String,
2111 pub level: String,
2112 pub message: String,
2113}
2114
2115#[derive(Debug, Serialize)]
2117pub struct NodeLogsResponse {
2118 pub node_id: String,
2119 pub logs: Vec<NodeLogEntry>,
2120 pub total: usize,
2121}
2122
2123pub async fn get_node_logs(
2125 State(state): State<AppState>,
2126 Path(node_id): Path<String>,
2127 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
2128) -> Json<NodeLogsResponse> {
2129 let limit: usize = params
2130 .get("limit")
2131 .and_then(|l| l.parse().ok())
2132 .unwrap_or(100);
2133
2134 let activities = state.activity.get_recent(limit);
2136 let logs: Vec<NodeLogEntry> = activities
2137 .iter()
2138 .map(|a| NodeLogEntry {
2139 timestamp: a.timestamp.clone(),
2140 level: match a.activity_type {
2141 ActivityType::Auth | ActivityType::System => "INFO".to_string(),
2142 ActivityType::Write | ActivityType::Delete => "WARN".to_string(),
2143 ActivityType::Query | ActivityType::Config | ActivityType::Node => {
2144 "INFO".to_string()
2145 }
2146 },
2147 message: a.description.clone(),
2148 })
2149 .collect();
2150
2151 let total = logs.len();
2152 Json(NodeLogsResponse {
2153 node_id,
2154 logs: logs.into_iter().take(limit).collect(),
2155 total,
2156 })
2157}
2158
2159#[derive(Debug, Clone, Serialize, Deserialize)]
2165pub struct ServerSettings {
2166 pub replication_factor: u8,
2167 pub auto_backups_enabled: bool,
2168 pub backup_schedule: String,
2169 pub retention_days: u32,
2170 pub tls_enabled: bool,
2171 pub auth_required: bool,
2172 pub session_timeout_minutes: u32,
2173 pub require_2fa: bool,
2174 pub audit_logging_enabled: bool,
2175}
2176
2177impl Default for ServerSettings {
2178 fn default() -> Self {
2179 Self {
2180 replication_factor: 3,
2181 auto_backups_enabled: true,
2182 backup_schedule: "0 2 * * *".to_string(),
2183 retention_days: 30,
2184 tls_enabled: false,
2185 auth_required: true,
2186 session_timeout_minutes: 60,
2187 require_2fa: false,
2188 audit_logging_enabled: true,
2189 }
2190 }
2191}
2192
2193pub async fn get_settings(State(state): State<AppState>) -> Json<ServerSettings> {
2195 state
2196 .activity
2197 .log(ActivityType::Config, "Retrieved server settings");
2198 let settings = state.settings.read().await;
2199 Json(settings.clone())
2200}
2201
2202pub async fn update_settings(
2204 State(state): State<AppState>,
2205 Json(new_settings): Json<ServerSettings>,
2206) -> impl IntoResponse {
2207 state.activity.log_config("Updated server settings", None);
2208 let mut settings = state.settings.write().await;
2209 *settings = new_settings.clone();
2210 drop(settings);
2211 state.save_settings().await;
2212 (
2213 StatusCode::OK,
2214 Json(serde_json::json!({"success": true, "settings": new_settings})),
2215 )
2216}
2217
2218#[derive(Debug, Clone, Serialize, Deserialize)]
2224pub struct UserListItem {
2225 pub id: String,
2226 pub username: String,
2227 pub email: String,
2228 pub role: String,
2229 pub mfa_enabled: bool,
2230 pub enabled: bool,
2231 pub created_at: String,
2232 pub last_login: Option<String>,
2233}
2234
2235pub async fn list_users(State(state): State<AppState>) -> Json<Vec<UserListItem>> {
2237 state.activity.log(ActivityType::Query, "Listed users");
2238 let users = state.auth.list_users();
2239 let user_list: Vec<UserListItem> = users
2240 .iter()
2241 .map(|u| UserListItem {
2242 id: u.id.clone(),
2243 username: u.username.clone(),
2244 email: u.email.clone(),
2245 role: format!("{:?}", u.role).to_lowercase(),
2246 mfa_enabled: u.mfa_enabled,
2247 enabled: true,
2248 created_at: u.created_at.clone(),
2249 last_login: None,
2250 })
2251 .collect();
2252 Json(user_list)
2253}
2254
2255#[derive(Debug, Deserialize)]
2257pub struct CreateUserRequest {
2258 pub username: String,
2259 pub email: String,
2260 pub password: String,
2261 pub role: String,
2262}
2263
2264pub async fn create_user(
2266 State(state): State<AppState>,
2267 Json(request): Json<CreateUserRequest>,
2268) -> impl IntoResponse {
2269 state
2270 .activity
2271 .log_write(&format!("Create user: {}", request.username), None);
2272
2273 match state.auth.create_user(
2274 &request.username,
2275 &request.email,
2276 &request.password,
2277 &request.role,
2278 ) {
2279 Ok(user) => (
2280 StatusCode::CREATED,
2281 Json(serde_json::json!({"success": true, "user": user})),
2282 ),
2283 Err(e) => (
2284 StatusCode::BAD_REQUEST,
2285 Json(serde_json::json!({"success": false, "error": e})),
2286 ),
2287 }
2288}
2289
2290#[derive(Debug, Deserialize)]
2292pub struct UpdateUserRequest {
2293 pub email: Option<String>,
2294 pub role: Option<String>,
2295 pub enabled: Option<bool>,
2296 pub password: Option<String>,
2297}
2298
2299pub async fn update_user(
2301 State(state): State<AppState>,
2302 Path(username): Path<String>,
2303 Json(request): Json<UpdateUserRequest>,
2304) -> impl IntoResponse {
2305 state
2306 .activity
2307 .log_write(&format!("Update user: {}", username), None);
2308
2309 match state
2310 .auth
2311 .update_user(&username, request.email, request.role, request.password)
2312 {
2313 Ok(user) => (
2314 StatusCode::OK,
2315 Json(serde_json::json!({"success": true, "user": user})),
2316 ),
2317 Err(e) => (
2318 StatusCode::NOT_FOUND,
2319 Json(serde_json::json!({"success": false, "error": e})),
2320 ),
2321 }
2322}
2323
2324pub async fn delete_user(
2326 State(state): State<AppState>,
2327 Path(username): Path<String>,
2328) -> impl IntoResponse {
2329 state
2330 .activity
2331 .log(ActivityType::Delete, &format!("Delete user: {}", username));
2332
2333 match state.auth.delete_user(&username) {
2334 Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2335 Err(e) => (
2336 StatusCode::NOT_FOUND,
2337 Json(serde_json::json!({"success": false, "error": e})),
2338 ),
2339 }
2340}
2341
2342#[derive(Debug, Clone, Serialize, Deserialize)]
2348pub struct RoleInfo {
2349 pub name: String,
2350 pub description: String,
2351 pub permissions: Vec<String>,
2352 pub created_at: String,
2353 pub is_builtin: bool,
2354}
2355
2356pub async fn list_roles(State(state): State<AppState>) -> Json<Vec<RoleInfo>> {
2358 state.activity.log(ActivityType::Query, "Listed roles");
2359 let roles = state.rbac.list_roles();
2360 let role_list: Vec<RoleInfo> = roles
2361 .iter()
2362 .map(|r| RoleInfo {
2363 name: r.name.clone(),
2364 description: r.description.clone(),
2365 permissions: r
2366 .permissions
2367 .iter()
2368 .map(|p| format!("{:?}", p).to_lowercase())
2369 .collect(),
2370 created_at: format_timestamp_ms(r.created_at),
2371 is_builtin: r.name == "admin"
2372 || r.name == "operator"
2373 || r.name == "viewer"
2374 || r.name == "analyst",
2375 })
2376 .collect();
2377 Json(role_list)
2378}
2379
2380#[derive(Debug, Deserialize)]
2382pub struct CreateRoleRequest {
2383 pub name: String,
2384 pub description: String,
2385 pub permissions: Vec<String>,
2386}
2387
2388pub async fn create_role(
2390 State(state): State<AppState>,
2391 Json(request): Json<CreateRoleRequest>,
2392) -> impl IntoResponse {
2393 state
2394 .activity
2395 .log_write(&format!("Create role: {}", request.name), None);
2396
2397 let permissions = parse_permissions(&request.permissions);
2399
2400 match state
2401 .rbac
2402 .create_role(&request.name, &request.description, permissions, "admin")
2403 {
2404 Ok(()) => {
2405 let role = state.rbac.get_role(&request.name);
2406 (
2407 StatusCode::CREATED,
2408 Json(
2409 serde_json::json!({"success": true, "role": request.name, "permissions": role.map(|r| r.permissions.len()).unwrap_or(0)}),
2410 ),
2411 )
2412 }
2413 Err(e) => (
2414 StatusCode::BAD_REQUEST,
2415 Json(serde_json::json!({"success": false, "error": e})),
2416 ),
2417 }
2418}
2419
2420pub async fn delete_role(
2422 State(state): State<AppState>,
2423 Path(name): Path<String>,
2424) -> impl IntoResponse {
2425 state
2426 .activity
2427 .log(ActivityType::Delete, &format!("Delete role: {}", name));
2428
2429 match state.rbac.delete_role(&name) {
2430 Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2431 Err(e) => (
2432 StatusCode::BAD_REQUEST,
2433 Json(serde_json::json!({"success": false, "error": e})),
2434 ),
2435 }
2436}
2437
2438fn parse_permissions(perms: &[String]) -> Vec<crate::auth::Permission> {
2440 use crate::auth::Permission;
2441 perms
2442 .iter()
2443 .filter_map(|p| match p.to_lowercase().as_str() {
2444 "database_create" | "databasecreate" => Some(Permission::DatabaseCreate),
2445 "database_drop" | "databasedrop" => Some(Permission::DatabaseDrop),
2446 "database_list" | "databaselist" => Some(Permission::DatabaseList),
2447 "table_create" | "tablecreate" => Some(Permission::TableCreate),
2448 "table_drop" | "tabledrop" => Some(Permission::TableDrop),
2449 "table_alter" | "tablealter" => Some(Permission::TableAlter),
2450 "table_list" | "tablelist" => Some(Permission::TableList),
2451 "data_select" | "dataselect" | "data:read" => Some(Permission::DataSelect),
2452 "data_insert" | "datainsert" | "data:write" => Some(Permission::DataInsert),
2453 "data_update" | "dataupdate" => Some(Permission::DataUpdate),
2454 "data_delete" | "datadelete" => Some(Permission::DataDelete),
2455 "user_create" | "usercreate" => Some(Permission::UserCreate),
2456 "user_delete" | "userdelete" => Some(Permission::UserDelete),
2457 "user_modify" | "usermodify" => Some(Permission::UserModify),
2458 "role_create" | "rolecreate" => Some(Permission::RoleCreate),
2459 "role_delete" | "roledelete" => Some(Permission::RoleDelete),
2460 "role_assign" | "roleassign" => Some(Permission::RoleAssign),
2461 "config_view" | "configview" => Some(Permission::ConfigView),
2462 "config_modify" | "configmodify" => Some(Permission::ConfigModify),
2463 "metrics_view" | "metricsview" => Some(Permission::MetricsView),
2464 "logs_view" | "logsview" => Some(Permission::LogsView),
2465 "backup_create" | "backupcreate" => Some(Permission::BackupCreate),
2466 "backup_restore" | "backuprestore" => Some(Permission::BackupRestore),
2467 "node_add" | "nodeadd" => Some(Permission::NodeAdd),
2468 "node_remove" | "noderemove" => Some(Permission::NodeRemove),
2469 "cluster_manage" | "clustermanage" => Some(Permission::ClusterManage),
2470 _ => None,
2471 })
2472 .collect()
2473}
2474
2475fn format_timestamp_ms(timestamp_ms: u64) -> String {
2477 let secs = timestamp_ms / 1000;
2478 let datetime = std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs);
2479 let duration = datetime
2480 .duration_since(std::time::UNIX_EPOCH)
2481 .unwrap_or_default();
2482 let total_secs = duration.as_secs();
2483
2484 let days_since_epoch = total_secs / 86400;
2485 let secs_today = total_secs % 86400;
2486 let hours = secs_today / 3600;
2487 let minutes = (secs_today % 3600) / 60;
2488 let seconds = secs_today % 60;
2489
2490 let mut year = 1970u64;
2491 let mut remaining_days = days_since_epoch;
2492 loop {
2493 let days_in_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) {
2494 366
2495 } else {
2496 365
2497 };
2498 if remaining_days < days_in_year {
2499 break;
2500 }
2501 remaining_days -= days_in_year;
2502 year += 1;
2503 }
2504
2505 let days_in_months: [u64; 12] = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) {
2506 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
2507 } else {
2508 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
2509 };
2510
2511 let mut month = 1u64;
2512 for &days in &days_in_months {
2513 if remaining_days < days {
2514 break;
2515 }
2516 remaining_days -= days;
2517 month += 1;
2518 }
2519 let day = remaining_days + 1;
2520
2521 format!(
2522 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
2523 year, month, day, hours, minutes, seconds
2524 )
2525}
2526
2527#[derive(Debug, Deserialize)]
2533pub struct MetricsTimeseriesRequest {
2534 pub time_range: String,
2535}
2536
2537#[derive(Debug, Clone, Serialize, Deserialize)]
2539pub struct MetricsDataPoint {
2540 pub timestamp: i64,
2541 pub cpu_percent: f64,
2542 pub memory_percent: f64,
2543 pub queries_per_second: f64,
2544 pub latency_ms: f64,
2545 pub connections: u64,
2546 pub bytes_in: u64,
2547 pub bytes_out: u64,
2548}
2549
2550#[derive(Debug, Serialize)]
2552pub struct MetricsTimeseriesResponse {
2553 pub time_range: String,
2554 pub data_points: Vec<MetricsDataPoint>,
2555}
2556
2557pub async fn get_metrics_timeseries(
2559 State(state): State<AppState>,
2560 Json(request): Json<MetricsTimeseriesRequest>,
2561) -> Json<MetricsTimeseriesResponse> {
2562 state.activity.log(
2563 ActivityType::Query,
2564 &format!("Query metrics timeseries: {}", request.time_range),
2565 );
2566
2567 let range_secs: i64 = match request.time_range.as_str() {
2569 "1h" => 3600,
2570 "6h" => 6 * 3600,
2571 "24h" => 24 * 3600,
2572 "7d" => 7 * 24 * 3600,
2573 "30d" => 30 * 24 * 3600,
2574 _ => 3600,
2575 };
2576
2577 let history = state.metrics_history.read().await;
2579 let now = Utc::now().timestamp();
2580 let start_time = now - range_secs;
2581
2582 let data_points: Vec<MetricsDataPoint> = history
2584 .iter()
2585 .filter(|p| p.timestamp >= start_time)
2586 .cloned()
2587 .collect();
2588
2589 Json(MetricsTimeseriesResponse {
2590 time_range: request.time_range,
2591 data_points,
2592 })
2593}
2594
2595#[derive(Debug, Deserialize)]
2601pub struct CreateNodeRequest {
2602 pub label: String,
2603 pub properties: serde_json::Value,
2604}
2605
2606#[derive(Debug, Deserialize)]
2608pub struct CreateEdgeRequest {
2609 pub source: String,
2610 pub target: String,
2611 pub relationship: String,
2612}
2613
2614pub async fn create_graph_node(
2616 State(state): State<AppState>,
2617 Json(request): Json<CreateNodeRequest>,
2618) -> impl IntoResponse {
2619 state
2620 .activity
2621 .log_write(&format!("Create graph node: {}", request.label), None);
2622
2623 let node = state
2624 .graph_store
2625 .create_node(&request.label, request.properties);
2626 (
2627 StatusCode::CREATED,
2628 Json(serde_json::json!({"success": true, "node": node})),
2629 )
2630}
2631
2632pub async fn create_graph_edge(
2634 State(state): State<AppState>,
2635 Json(request): Json<CreateEdgeRequest>,
2636) -> impl IntoResponse {
2637 state.activity.log_write(
2638 &format!(
2639 "Create graph edge: {} -> {}",
2640 request.source, request.target
2641 ),
2642 None,
2643 );
2644
2645 match state
2646 .graph_store
2647 .create_edge(&request.source, &request.target, &request.relationship)
2648 {
2649 Ok(edge) => (
2650 StatusCode::CREATED,
2651 Json(serde_json::json!({"success": true, "edge": edge})),
2652 ),
2653 Err(e) => (
2654 StatusCode::BAD_REQUEST,
2655 Json(serde_json::json!({"success": false, "error": e})),
2656 ),
2657 }
2658}
2659
2660pub async fn delete_graph_node(
2662 State(state): State<AppState>,
2663 Path(node_id): Path<String>,
2664) -> impl IntoResponse {
2665 state.activity.log(
2666 ActivityType::Delete,
2667 &format!("Delete graph node: {}", node_id),
2668 );
2669
2670 match state.graph_store.delete_node(&node_id) {
2671 Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2672 Err(e) => (
2673 StatusCode::NOT_FOUND,
2674 Json(serde_json::json!({"success": false, "error": e})),
2675 ),
2676 }
2677}
2678
2679pub async fn get_update_version(State(state): State<AppState>) -> impl IntoResponse {
2685 let version = aegis_updates::version::VERSION;
2686 let node_name = state
2687 .config
2688 .node_name
2689 .clone()
2690 .unwrap_or_else(|| "unknown".to_string());
2691
2692 (
2693 StatusCode::OK,
2694 Json(serde_json::json!({
2695 "success": true,
2696 "version": version,
2697 "node_id": state.config.node_id,
2698 "node_name": node_name,
2699 })),
2700 )
2701}
2702
2703#[derive(serde::Deserialize)]
2705pub struct CreateUpdatePlanRequest {
2706 pub version: String,
2707 pub binary_url: String,
2708 pub sha256: String,
2709}
2710
2711pub async fn create_update_plan(
2712 State(state): State<AppState>,
2713 Json(request): Json<CreateUpdatePlanRequest>,
2714) -> impl IntoResponse {
2715 state.activity.log_system(&format!(
2716 "Creating update plan for version {}",
2717 request.version
2718 ));
2719
2720 let peers = state.admin.get_peers();
2722 let mut nodes = vec![aegis_updates::orchestrator::ClusterNode {
2723 node_id: state.config.node_id.clone(),
2724 name: state
2725 .config
2726 .node_name
2727 .clone()
2728 .unwrap_or_else(|| "self".to_string()),
2729 address: format!("http://{}:{}", state.config.host, state.config.port),
2730 role: "leader".to_string(),
2731 }];
2732 for peer in &peers {
2733 nodes.push(aegis_updates::orchestrator::ClusterNode {
2734 node_id: peer.id.clone(),
2735 name: peer.name.clone().unwrap_or_else(|| peer.id.clone()),
2736 address: peer.address.clone(),
2737 role: "follower".to_string(),
2738 });
2739 }
2740 state.update_orchestrator.set_cluster_nodes(nodes).await;
2741
2742 let plan = state
2743 .update_orchestrator
2744 .create_plan(request.version, request.binary_url, request.sha256)
2745 .await;
2746
2747 (
2748 StatusCode::CREATED,
2749 Json(serde_json::json!({
2750 "success": true,
2751 "plan": plan,
2752 })),
2753 )
2754}
2755
2756#[derive(serde::Deserialize)]
2758pub struct ExecuteUpdateRequest {
2759 pub plan_id: String,
2760}
2761
2762pub async fn execute_update_plan(
2763 State(state): State<AppState>,
2764 Json(request): Json<ExecuteUpdateRequest>,
2765) -> impl IntoResponse {
2766 state
2767 .activity
2768 .log_system(&format!("Executing update plan {}", request.plan_id));
2769
2770 match state
2771 .update_orchestrator
2772 .execute_plan(&request.plan_id)
2773 .await
2774 {
2775 Ok(()) => (
2776 StatusCode::OK,
2777 Json(serde_json::json!({
2778 "success": true,
2779 "message": "Update completed successfully",
2780 })),
2781 ),
2782 Err(e) => (
2783 StatusCode::INTERNAL_SERVER_ERROR,
2784 Json(serde_json::json!({
2785 "success": false,
2786 "error": e.to_string(),
2787 })),
2788 ),
2789 }
2790}
2791
2792pub async fn get_update_status(
2794 State(state): State<AppState>,
2795 Path(plan_id): Path<String>,
2796) -> impl IntoResponse {
2797 match state.update_orchestrator.get_plan(&plan_id).await {
2798 Some(plan) => (
2799 StatusCode::OK,
2800 Json(serde_json::json!({
2801 "success": true,
2802 "plan": plan,
2803 })),
2804 ),
2805 None => (
2806 StatusCode::NOT_FOUND,
2807 Json(serde_json::json!({
2808 "success": false,
2809 "error": format!("Plan {} not found", plan_id),
2810 })),
2811 ),
2812 }
2813}
2814
2815pub async fn list_update_plans(State(state): State<AppState>) -> impl IntoResponse {
2817 let plans = state.update_orchestrator.list_plans().await;
2818
2819 (
2820 StatusCode::OK,
2821 Json(serde_json::json!({
2822 "success": true,
2823 "plans": plans,
2824 })),
2825 )
2826}