Skip to main content

aegis_server/
handlers.rs

1//! Aegis Request Handlers
2//!
3//! HTTP request handlers for the REST API. Implements endpoints for
4//! query execution, health checks, and administrative operations.
5//! All handlers use real engine integrations - no mock data.
6//!
7//! @version 0.1.0
8//! @author AutomataNexus Development Team
9
10use crate::activity::{Activity, ActivityType};
11use crate::admin::{
12    AlertInfo, AlertSeverity, ClusterInfo, DashboardSummary, NodeInfo, QueryStats, StorageInfo,
13};
14use crate::auth::{LoginRequest, MfaVerifyRequest, UserInfo};
15use crate::state::{AppState, GraphEdge, GraphNode, KvEntry, QueryError, QueryResult};
16use aegis_document::{Document, DocumentId, Query as DocQuery, QueryResult as DocQueryResult};
17use aegis_streaming::{event::EventData, ChannelId, Event, EventType as StreamEventType};
18use aegis_timeseries::{DataPoint, Metric, MetricType, Tags, TimeSeriesQuery};
19use axum::{
20    extract::{Path, State},
21    http::StatusCode,
22    response::IntoResponse,
23    Json,
24};
25use chrono::{Duration, Utc};
26use serde::{Deserialize, Serialize};
27use std::time::Instant;
28
29// =============================================================================
30// Health Check
31// =============================================================================
32
33/// Health check response.
34#[derive(Debug, Serialize)]
35pub struct HealthResponse {
36    pub status: String,
37    pub version: String,
38}
39
40/// Health check endpoint.
41pub async fn health_check() -> Json<HealthResponse> {
42    Json(HealthResponse {
43        status: "healthy".to_string(),
44        version: env!("CARGO_PKG_VERSION").to_string(),
45    })
46}
47
48// =============================================================================
49// Query Endpoints
50// =============================================================================
51
52/// Query request body.
53#[derive(Debug, Deserialize)]
54pub struct QueryRequest {
55    /// Target database name (optional, defaults to "default")
56    #[serde(default)]
57    pub database: Option<String>,
58    pub sql: String,
59    #[serde(default)]
60    pub params: Vec<serde_json::Value>,
61}
62
63/// Query response.
64#[derive(Debug, Serialize)]
65pub struct QueryResponse {
66    pub success: bool,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub data: Option<QueryResult>,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub error: Option<String>,
71    pub execution_time_ms: u64,
72}
73
74/// Execute a SQL query.
75pub async fn execute_query(
76    State(state): State<AppState>,
77    headers: axum::http::HeaderMap,
78    Json(request): Json<QueryRequest>,
79) -> impl IntoResponse {
80    let start = Instant::now();
81    let is_replicated = headers.get("x-aegis-replicated").is_some();
82
83    let result = if is_replicated {
84        // Replicated query — execute locally, don't re-replicate
85        state
86            .execute_query_replicated(&request.sql, request.database.as_deref())
87            .await
88    } else if !request.params.is_empty() {
89        state
90            .execute_query_with_params(&request.sql, request.database.as_deref(), &request.params)
91            .await
92    } else {
93        state
94            .execute_query(&request.sql, request.database.as_deref())
95            .await
96    };
97    let duration_ms = start.elapsed().as_millis() as u64;
98
99    match result {
100        Ok(data) => {
101            state.record_request(duration_ms, true).await;
102            (
103                StatusCode::OK,
104                Json(QueryResponse {
105                    success: true,
106                    data: Some(data),
107                    error: None,
108                    execution_time_ms: duration_ms,
109                }),
110            )
111        }
112        Err(e) => {
113            state.record_request(duration_ms, false).await;
114            let (status, client_msg) = match &e {
115                QueryError::Parse(_) => (StatusCode::BAD_REQUEST, "Query syntax error"),
116                QueryError::Plan(_) => (StatusCode::BAD_REQUEST, "Query planning error"),
117                QueryError::Execute(_) => {
118                    (StatusCode::INTERNAL_SERVER_ERROR, "Query execution error")
119                }
120            };
121            tracing::warn!("Query failed: {}", e);
122            (
123                status,
124                Json(QueryResponse {
125                    success: false,
126                    data: None,
127                    error: Some(client_msg.to_string()),
128                    execution_time_ms: duration_ms,
129                }),
130            )
131        }
132    }
133}
134
135// =============================================================================
136// Table Endpoints
137// =============================================================================
138
139/// List tables response.
140#[derive(Debug, Serialize)]
141pub struct TablesResponse {
142    pub tables: Vec<TableInfo>,
143}
144
145/// Table information.
146#[derive(Debug, Serialize)]
147pub struct TableInfo {
148    pub name: String,
149    pub columns: Vec<ColumnInfo>,
150    pub row_count: Option<u64>,
151}
152
153/// Column information.
154#[derive(Debug, Serialize)]
155pub struct ColumnInfo {
156    pub name: String,
157    pub data_type: String,
158    pub nullable: bool,
159}
160
161/// List all tables (from default database, use query endpoint with database param for others).
162pub async fn list_tables(State(state): State<AppState>) -> Json<TablesResponse> {
163    let table_names = state.query_engine.list_tables(None);
164    let tables: Vec<TableInfo> = table_names
165        .into_iter()
166        .filter_map(|name| state.query_engine.get_table_info(&name, None))
167        .map(|info| TableInfo {
168            name: info.name,
169            columns: info
170                .columns
171                .into_iter()
172                .map(|c| ColumnInfo {
173                    name: c.name,
174                    data_type: c.data_type,
175                    nullable: c.nullable,
176                })
177                .collect(),
178            row_count: info.row_count,
179        })
180        .collect();
181    Json(TablesResponse { tables })
182}
183
184/// Get table details (from default database).
185pub async fn get_table(
186    State(state): State<AppState>,
187    Path(name): Path<String>,
188) -> impl IntoResponse {
189    match state.query_engine.get_table_info(&name, None) {
190        Some(info) => Json(TableInfo {
191            name: info.name,
192            columns: info
193                .columns
194                .into_iter()
195                .map(|c| ColumnInfo {
196                    name: c.name,
197                    data_type: c.data_type,
198                    nullable: c.nullable,
199                })
200                .collect(),
201            row_count: info.row_count,
202        }),
203        None => Json(TableInfo {
204            name,
205            columns: vec![],
206            row_count: None,
207        }),
208    }
209}
210
211// =============================================================================
212// Metrics Endpoint
213// =============================================================================
214
215/// Metrics response.
216#[derive(Debug, Serialize)]
217pub struct MetricsResponse {
218    pub total_requests: u64,
219    pub failed_requests: u64,
220    pub avg_duration_ms: f64,
221    pub success_rate: f64,
222}
223
224/// Get server metrics.
225pub async fn get_metrics(State(state): State<AppState>) -> Json<MetricsResponse> {
226    let metrics = state.metrics.read().await;
227    Json(MetricsResponse {
228        total_requests: metrics.total_requests,
229        failed_requests: metrics.failed_requests,
230        avg_duration_ms: metrics.avg_duration_ms(),
231        success_rate: metrics.success_rate(),
232    })
233}
234
235// =============================================================================
236// Error Response
237// =============================================================================
238
239/// Generic error response.
240#[derive(Debug, Serialize)]
241pub struct ErrorResponse {
242    pub error: String,
243    pub code: String,
244}
245
246impl ErrorResponse {
247    pub fn new(error: impl ToString, code: impl ToString) -> Self {
248        Self {
249            error: error.to_string(),
250            code: code.to_string(),
251        }
252    }
253}
254
255/// Not found handler.
256pub async fn not_found() -> impl IntoResponse {
257    (
258        StatusCode::NOT_FOUND,
259        Json(ErrorResponse::new("Not found", "NOT_FOUND")),
260    )
261}
262
263// =============================================================================
264// Admin Endpoints
265// =============================================================================
266
267/// Get cluster information.
268pub async fn get_cluster_info(State(state): State<AppState>) -> Json<ClusterInfo> {
269    Json(state.admin.get_cluster_info())
270}
271
272/// Get dashboard summary.
273pub async fn get_dashboard_summary(State(state): State<AppState>) -> Json<DashboardSummary> {
274    Json(state.admin.get_dashboard_summary())
275}
276
277/// Get all nodes.
278pub async fn get_nodes(State(state): State<AppState>) -> Json<Vec<NodeInfo>> {
279    Json(state.admin.get_nodes())
280}
281
282// =============================================================================
283// Cluster Peer Management
284// =============================================================================
285
286/// Request to join a cluster.
287#[derive(Debug, Deserialize)]
288pub struct JoinClusterRequest {
289    pub node_id: String,
290    pub node_name: Option<String>,
291    pub address: String,
292}
293
294/// Response from joining a cluster.
295#[derive(Debug, Serialize)]
296pub struct JoinClusterResponse {
297    pub success: bool,
298    pub message: String,
299    pub peers: Vec<PeerInfo>,
300}
301
302/// Peer info for cluster responses.
303#[derive(Debug, Serialize)]
304pub struct PeerInfo {
305    pub id: String,
306    pub name: Option<String>,
307    pub address: String,
308}
309
310/// Get this node's info for peer discovery.
311pub async fn get_node_info(State(state): State<AppState>) -> Json<crate::admin::PeerNode> {
312    Json(state.admin.get_self_info())
313}
314
315/// Join/register with this node (called by other nodes).
316pub async fn cluster_join(
317    State(state): State<AppState>,
318    Json(req): Json<JoinClusterRequest>,
319) -> Json<JoinClusterResponse> {
320    use crate::admin::{NodeRole, NodeStatus, PeerNode};
321
322    // Register the requesting node as a peer
323    let peer = PeerNode {
324        id: req.node_id.clone(),
325        name: req.node_name.clone(),
326        address: req.address.clone(),
327        status: NodeStatus::Online,
328        role: NodeRole::Follower,
329        last_seen: std::time::SystemTime::now()
330            .duration_since(std::time::UNIX_EPOCH)
331            .unwrap_or_default()
332            .as_millis() as u64,
333        version: env!("CARGO_PKG_VERSION").to_string(),
334        uptime_seconds: 0,
335        metrics: None,
336    };
337
338    state.admin.register_peer(peer);
339    state.admin.add_peer_address(req.address.clone());
340
341    tracing::info!(
342        "Node joined cluster: {} ({}) at {}",
343        req.node_id,
344        req.node_name.as_deref().unwrap_or("unnamed"),
345        req.address
346    );
347
348    // Return list of all known peers (including self)
349    let self_info = state.admin.get_self_info();
350    let mut peers = vec![PeerInfo {
351        id: self_info.id,
352        name: self_info.name,
353        address: self_info.address,
354    }];
355
356    for peer in state.admin.get_peers() {
357        if peer.id != req.node_id {
358            peers.push(PeerInfo {
359                id: peer.id,
360                name: peer.name,
361                address: peer.address,
362            });
363        }
364    }
365
366    Json(JoinClusterResponse {
367        success: true,
368        message: "Successfully joined cluster".to_string(),
369        peers,
370    })
371}
372
373/// Heartbeat from a peer node.
374#[derive(Debug, Deserialize)]
375pub struct HeartbeatRequest {
376    pub node_id: String,
377    pub node_name: Option<String>,
378    pub address: String,
379    pub uptime_seconds: u64,
380    pub metrics: Option<crate::admin::NodeMetrics>,
381}
382
383/// Receive heartbeat from a peer.
384pub async fn cluster_heartbeat(
385    State(state): State<AppState>,
386    Json(req): Json<HeartbeatRequest>,
387) -> Json<serde_json::Value> {
388    use crate::admin::{NodeRole, NodeStatus, PeerNode};
389
390    // Update peer info
391    let peer = PeerNode {
392        id: req.node_id.clone(),
393        name: req.node_name,
394        address: req.address,
395        status: NodeStatus::Online,
396        role: NodeRole::Follower,
397        last_seen: std::time::SystemTime::now()
398            .duration_since(std::time::UNIX_EPOCH)
399            .unwrap_or_default()
400            .as_millis() as u64,
401        version: env!("CARGO_PKG_VERSION").to_string(),
402        uptime_seconds: req.uptime_seconds,
403        metrics: req.metrics,
404    };
405
406    state.admin.register_peer(peer);
407
408    Json(serde_json::json!({
409        "success": true,
410        "timestamp": std::time::SystemTime::now()
411            .duration_since(std::time::UNIX_EPOCH)
412            .unwrap_or_default()
413            .as_millis()
414    }))
415}
416
417/// Get list of known peers.
418pub async fn get_peers(State(state): State<AppState>) -> Json<Vec<crate::admin::PeerNode>> {
419    Json(state.admin.get_peers())
420}
421
422/// Get storage information.
423pub async fn get_storage_info(State(state): State<AppState>) -> Json<StorageInfo> {
424    Json(state.admin.get_storage_info())
425}
426
427/// Get query statistics.
428pub async fn get_query_stats(State(state): State<AppState>) -> Json<QueryStats> {
429    Json(state.admin.get_query_stats())
430}
431
432/// Get database statistics (key counts, document counts, etc.)
433/// Use ?local=true to get only local stats (used by peer aggregation to avoid loops)
434pub async fn get_database_stats(
435    State(state): State<AppState>,
436    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
437) -> Json<crate::state::DatabaseStats> {
438    // Start with local stats
439    let mut stats = state.get_database_stats();
440
441    // If local=true, return only local stats (prevents infinite recursion when peers call each other)
442    if params.get("local").map(|v| v == "true").unwrap_or(false) {
443        return Json(stats);
444    }
445
446    // Aggregate stats from all cluster peers (call with ?local=true to prevent loops)
447    let peers = state.admin.get_peers();
448    let client = reqwest::Client::new();
449
450    for peer in peers {
451        let url = format!("http://{}/api/v1/admin/database?local=true", peer.address);
452        if let Ok(response) = client
453            .get(&url)
454            .timeout(std::time::Duration::from_secs(2))
455            .send()
456            .await
457        {
458            if let Ok(peer_stats) = response.json::<crate::state::DatabaseStats>().await {
459                stats.total_keys += peer_stats.total_keys;
460                stats.total_documents += peer_stats.total_documents;
461                stats.collection_count += peer_stats.collection_count;
462                stats.documents_inserted += peer_stats.documents_inserted;
463                stats.documents_updated += peer_stats.documents_updated;
464                stats.documents_deleted += peer_stats.documents_deleted;
465                stats.queries_executed += peer_stats.queries_executed;
466            }
467        }
468    }
469
470    Json(stats)
471}
472
473/// Alert response structure.
474#[derive(Debug, Serialize)]
475pub struct AlertsResponse {
476    pub alerts: Vec<AlertInfo>,
477}
478
479/// Get active alerts based on real system conditions.
480pub async fn get_alerts(State(_state): State<AppState>) -> Json<AlertsResponse> {
481    use sysinfo::{Disks, System};
482
483    let mut alerts = Vec::new();
484    let now = std::time::SystemTime::now()
485        .duration_since(std::time::UNIX_EPOCH)
486        .unwrap_or_default()
487        .as_millis() as u64;
488
489    // Check memory usage
490    let mut sys = System::new();
491    sys.refresh_memory();
492
493    let memory_total = sys.total_memory();
494    let memory_used = sys.used_memory();
495    if memory_total > 0 {
496        let memory_percent = (memory_used as f64 / memory_total as f64) * 100.0;
497        if memory_percent > 90.0 {
498            alerts.push(AlertInfo {
499                id: "mem-critical".to_string(),
500                severity: AlertSeverity::Critical,
501                source: "system".to_string(),
502                message: format!("Critical memory usage: {:.1}%", memory_percent),
503                timestamp: now,
504                acknowledged: false,
505                resolved: false,
506            });
507        } else if memory_percent > 80.0 {
508            alerts.push(AlertInfo {
509                id: "mem-warning".to_string(),
510                severity: AlertSeverity::Warning,
511                source: "system".to_string(),
512                message: format!("High memory usage: {:.1}%", memory_percent),
513                timestamp: now,
514                acknowledged: false,
515                resolved: false,
516            });
517        }
518    }
519
520    // Check disk usage
521    let disks = Disks::new_with_refreshed_list();
522    for disk in disks.list() {
523        let total = disk.total_space();
524        let available = disk.available_space();
525        if total > 0 {
526            let used_percent = ((total - available) as f64 / total as f64) * 100.0;
527            let mount = disk.mount_point().to_string_lossy();
528            if used_percent > 95.0 {
529                alerts.push(AlertInfo {
530                    id: format!("disk-critical-{}", mount.replace("/", "_")),
531                    severity: AlertSeverity::Critical,
532                    source: "system".to_string(),
533                    message: format!("Critical disk usage on {}: {:.1}%", mount, used_percent),
534                    timestamp: now,
535                    acknowledged: false,
536                    resolved: false,
537                });
538            } else if used_percent > 85.0 {
539                alerts.push(AlertInfo {
540                    id: format!("disk-warning-{}", mount.replace("/", "_")),
541                    severity: AlertSeverity::Warning,
542                    source: "system".to_string(),
543                    message: format!("High disk usage on {}: {:.1}%", mount, used_percent),
544                    timestamp: now,
545                    acknowledged: false,
546                    resolved: false,
547                });
548            }
549        }
550    }
551
552    Json(AlertsResponse { alerts })
553}
554
555// =============================================================================
556// Authentication Endpoints
557// =============================================================================
558
559/// Login endpoint.
560pub async fn login(
561    State(state): State<AppState>,
562    Json(request): Json<LoginRequest>,
563) -> impl IntoResponse {
564    let response = state.auth.login(&request.username, &request.password);
565
566    if response.error.is_some() {
567        state.activity.log_auth(
568            &format!("Failed login attempt for user: {}", request.username),
569            Some(&request.username),
570        );
571        (StatusCode::UNAUTHORIZED, Json(response))
572    } else if response.requires_mfa == Some(true) {
573        state.activity.log_auth(
574            &format!("MFA required for user: {}", request.username),
575            Some(&request.username),
576        );
577        (StatusCode::OK, Json(response))
578    } else {
579        state.activity.log_auth(
580            &format!("User logged in: {}", request.username),
581            Some(&request.username),
582        );
583        (StatusCode::OK, Json(response))
584    }
585}
586
587/// MFA verification endpoint.
588pub async fn verify_mfa(
589    State(state): State<AppState>,
590    Json(request): Json<MfaVerifyRequest>,
591) -> impl IntoResponse {
592    let response = state.auth.verify_mfa(&request.code, &request.token);
593
594    if response.error.is_some() {
595        state.activity.log_auth("Failed MFA verification", None);
596        (StatusCode::UNAUTHORIZED, Json(response))
597    } else {
598        let username = response.user.as_ref().map(|u| u.username.as_str());
599        state.activity.log_auth(
600            &format!("MFA verified for user: {}", username.unwrap_or("unknown")),
601            username,
602        );
603        (StatusCode::OK, Json(response))
604    }
605}
606
607/// Logout request.
608#[derive(Debug, Deserialize)]
609pub struct LogoutRequest {
610    pub token: String,
611}
612
613/// Logout response.
614#[derive(Debug, Serialize)]
615pub struct LogoutResponse {
616    pub success: bool,
617}
618
619/// Logout endpoint.
620pub async fn logout(
621    State(state): State<AppState>,
622    Json(request): Json<LogoutRequest>,
623) -> Json<LogoutResponse> {
624    let success = state.auth.logout(&request.token);
625
626    if success {
627        state.activity.log_auth("User logged out", None);
628    }
629
630    Json(LogoutResponse { success })
631}
632
633/// Validate session endpoint.
634pub async fn validate_session(
635    State(state): State<AppState>,
636    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
637) -> impl IntoResponse {
638    let token = params.get("token").map(|s| s.as_str()).unwrap_or("");
639
640    match state.auth.validate_session(token) {
641        Some(user) => {
642            let user_info: UserInfo = user;
643            (StatusCode::OK, Json(Some(user_info)))
644        }
645        None => (StatusCode::UNAUTHORIZED, Json(None::<UserInfo>)),
646    }
647}
648
649/// Get current user endpoint.
650pub async fn get_current_user(
651    State(state): State<AppState>,
652    headers: axum::http::HeaderMap,
653) -> impl IntoResponse {
654    let auth_header = headers
655        .get("authorization")
656        .and_then(|v| v.to_str().ok())
657        .unwrap_or("");
658
659    let token = auth_header.strip_prefix("Bearer ").unwrap_or(auth_header);
660
661    match state.auth.validate_session(token) {
662        Some(user) => {
663            let user_info: UserInfo = user;
664            (StatusCode::OK, Json(Some(user_info)))
665        }
666        None => (StatusCode::UNAUTHORIZED, Json(None::<UserInfo>)),
667    }
668}
669
670// =============================================================================
671// Activity Endpoints
672// =============================================================================
673
674/// Activity query parameters.
675#[derive(Debug, Deserialize)]
676pub struct ActivityQuery {
677    #[serde(default = "default_limit")]
678    pub limit: usize,
679    pub activity_type: Option<String>,
680    pub user: Option<String>,
681}
682
683fn default_limit() -> usize {
684    50
685}
686
687/// Get recent activities.
688pub async fn get_activities(
689    State(state): State<AppState>,
690    axum::extract::Query(params): axum::extract::Query<ActivityQuery>,
691) -> Json<Vec<Activity>> {
692    let activities = if let Some(ref user) = params.user {
693        state.activity.get_by_user(user, params.limit)
694    } else if let Some(ref activity_type) = params.activity_type {
695        let at = match activity_type.as_str() {
696            "query" => ActivityType::Query,
697            "write" => ActivityType::Write,
698            "delete" => ActivityType::Delete,
699            "config" => ActivityType::Config,
700            "node" => ActivityType::Node,
701            "auth" => ActivityType::Auth,
702            _ => ActivityType::System,
703        };
704        state.activity.get_by_type(at, params.limit)
705    } else {
706        state.activity.get_recent(params.limit)
707    };
708
709    Json(activities)
710}
711
712// =============================================================================
713// Key-Value Store Endpoints (REAL IMPLEMENTATION)
714// =============================================================================
715
716/// List keys response.
717#[derive(Debug, Serialize)]
718pub struct ListKeysResponse {
719    pub keys: Vec<KvEntry>,
720    pub total: usize,
721}
722
723/// List all keys - uses real KvStore.
724pub async fn list_keys(
725    State(state): State<AppState>,
726    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
727) -> Json<ListKeysResponse> {
728    let limit = params
729        .get("limit")
730        .and_then(|s| s.parse().ok())
731        .unwrap_or(100);
732    let prefix = params.get("prefix").map(|s| s.as_str());
733
734    state.activity.log(ActivityType::Query, "Listed keys");
735
736    let keys = state.kv_store.list(prefix, limit);
737    let total = keys.len();
738
739    Json(ListKeysResponse { keys, total })
740}
741
742/// Set key request.
743#[derive(Debug, Deserialize)]
744pub struct SetKeyRequest {
745    pub key: String,
746    pub value: serde_json::Value,
747    pub ttl: Option<u64>,
748}
749
750/// Set a key's value - uses real KvStore.
751pub async fn set_key(
752    State(state): State<AppState>,
753    Json(request): Json<SetKeyRequest>,
754) -> Json<KvEntry> {
755    state
756        .activity
757        .log_write(&format!("Set key: {}", request.key), None);
758    let entry = state.kv_store.set(request.key, request.value, request.ttl);
759    Json(entry)
760}
761
762/// Get a specific key.
763pub async fn get_key(State(state): State<AppState>, Path(key): Path<String>) -> impl IntoResponse {
764    match state.kv_store.get(&key) {
765        Some(entry) => (StatusCode::OK, Json(Some(entry))),
766        None => (StatusCode::NOT_FOUND, Json(None)),
767    }
768}
769
770/// Delete a key - uses real KvStore.
771pub async fn delete_key(
772    State(state): State<AppState>,
773    Path(key): Path<String>,
774) -> impl IntoResponse {
775    state
776        .activity
777        .log(ActivityType::Delete, &format!("Delete key: {}", key));
778    match state.kv_store.delete(&key) {
779        Some(_) => (
780            StatusCode::OK,
781            Json(serde_json::json!({"success": true, "key": key})),
782        ),
783        None => (
784            StatusCode::NOT_FOUND,
785            Json(serde_json::json!({"success": false, "error": "Key not found"})),
786        ),
787    }
788}
789
790// =============================================================================
791// Document Store Endpoints (REAL IMPLEMENTATION)
792// =============================================================================
793
794/// Collection info response.
795#[derive(Debug, Serialize)]
796pub struct CollectionInfoResponse {
797    pub name: String,
798    pub document_count: usize,
799    pub index_count: usize,
800}
801
802/// List collections - uses real DocumentEngine.
803pub async fn list_collections(State(state): State<AppState>) -> Json<Vec<CollectionInfoResponse>> {
804    state
805        .activity
806        .log(ActivityType::Query, "Listed collections");
807
808    let collection_names = state.document_engine.list_collections();
809    let collections: Vec<CollectionInfoResponse> = collection_names
810        .iter()
811        .filter_map(|name| {
812            state
813                .document_engine
814                .collection_stats(name)
815                .map(|stats| CollectionInfoResponse {
816                    name: stats.name,
817                    document_count: stats.document_count,
818                    index_count: stats.index_count,
819                })
820        })
821        .collect();
822
823    Json(collections)
824}
825
826/// Document response.
827#[derive(Debug, Serialize)]
828pub struct DocumentResponse {
829    pub id: String,
830    pub collection: String,
831    pub data: serde_json::Value,
832}
833
834/// Collection query response with full result information.
835#[derive(Debug, Serialize)]
836pub struct CollectionQueryResponse {
837    pub documents: Vec<DocumentResponse>,
838    pub total_scanned: usize,
839    pub execution_time_ms: u64,
840}
841
842/// Get documents in a collection - uses real DocumentEngine.
843pub async fn get_collection_documents(
844    State(state): State<AppState>,
845    Path(collection): Path<String>,
846) -> impl IntoResponse {
847    state.activity.log(
848        ActivityType::Query,
849        &format!("Query collection: {}", collection),
850    );
851
852    // Use find with empty query to get all documents
853    let query = DocQuery::new();
854    match state.document_engine.find(&collection, &query) {
855        Ok(result) => {
856            // Explicit type annotation to use DocQueryResult
857            let query_result: &DocQueryResult = &result;
858            let docs: Vec<DocumentResponse> = query_result
859                .documents
860                .iter()
861                .map(|doc| DocumentResponse {
862                    id: doc.id.to_string(),
863                    collection: collection.clone(),
864                    data: doc_to_json(doc),
865                })
866                .collect();
867            let response = CollectionQueryResponse {
868                documents: docs,
869                total_scanned: query_result.total_scanned,
870                execution_time_ms: query_result.execution_time_ms,
871            };
872            (StatusCode::OK, Json(response))
873        }
874        Err(_e) => {
875            let empty = CollectionQueryResponse {
876                documents: vec![],
877                total_scanned: 0,
878                execution_time_ms: 0,
879            };
880            (StatusCode::NOT_FOUND, Json(empty))
881        }
882    }
883}
884
885/// Get a single document by ID.
886pub async fn get_document(
887    State(state): State<AppState>,
888    Path((collection, id)): Path<(String, String)>,
889) -> impl IntoResponse {
890    state.activity.log(
891        ActivityType::Query,
892        &format!("Get document: {}/{}", collection, id),
893    );
894
895    let doc_id = DocumentId::new(&id);
896    match state.document_engine.get(&collection, &doc_id) {
897        Ok(Some(doc)) => {
898            let response = DocumentResponse {
899                id: doc.id.to_string(),
900                collection: collection.clone(),
901                data: doc_to_json(&doc),
902            };
903            (StatusCode::OK, Json(serde_json::json!(response)))
904        }
905        Ok(None) => (
906            StatusCode::NOT_FOUND,
907            Json(serde_json::json!({"error": "Document not found"})),
908        ),
909        Err(e) => (
910            StatusCode::BAD_REQUEST,
911            Json(serde_json::json!({"error": e.to_string()})),
912        ),
913    }
914}
915
916/// Delete a document from a collection.
917pub async fn delete_document(
918    State(state): State<AppState>,
919    Path((collection, id)): Path<(String, String)>,
920) -> impl IntoResponse {
921    state.activity.log(
922        ActivityType::Delete,
923        &format!("Delete document: {}/{}", collection, id),
924    );
925
926    let doc_id = DocumentId::new(&id);
927    match state.document_engine.delete(&collection, &doc_id) {
928        Ok(doc) => {
929            state.flush_collection(&collection);
930            let response = DocumentResponse {
931                id: doc.id.to_string(),
932                collection: collection.clone(),
933                data: doc_to_json(&doc),
934            };
935            (
936                StatusCode::OK,
937                Json(serde_json::json!({"success": true, "deleted": response})),
938            )
939        }
940        Err(e) => (
941            StatusCode::NOT_FOUND,
942            Json(serde_json::json!({"success": false, "error": e.to_string()})),
943        ),
944    }
945}
946
947/// Update document request.
948#[derive(Debug, Deserialize)]
949pub struct UpdateDocumentRequest {
950    pub document: serde_json::Value,
951}
952
953/// Update a document in a collection (full replacement).
954pub async fn update_document(
955    State(state): State<AppState>,
956    Path((collection, id)): Path<(String, String)>,
957    Json(request): Json<UpdateDocumentRequest>,
958) -> impl IntoResponse {
959    state
960        .activity
961        .log_write(&format!("Update document: {}/{}", collection, id), None);
962
963    let doc_id = DocumentId::new(&id);
964
965    // Convert JSON to Document, preserving the ID
966    let mut doc = json_to_doc(request.document);
967    doc.id = doc_id.clone();
968
969    match state.document_engine.update(&collection, &doc_id, doc) {
970        Ok(()) => {
971            state.flush_collection(&collection);
972            // Fetch the updated document to return it
973            match state.document_engine.get(&collection, &doc_id) {
974                Ok(Some(updated_doc)) => {
975                    let response = DocumentResponse {
976                        id: updated_doc.id.to_string(),
977                        collection: collection.clone(),
978                        data: doc_to_json(&updated_doc),
979                    };
980                    (
981                        StatusCode::OK,
982                        Json(serde_json::json!({"success": true, "document": response})),
983                    )
984                }
985                _ => (
986                    StatusCode::OK,
987                    Json(serde_json::json!({"success": true, "id": id})),
988                ),
989            }
990        }
991        Err(e) => (
992            StatusCode::NOT_FOUND,
993            Json(serde_json::json!({"success": false, "error": e.to_string()})),
994        ),
995    }
996}
997
998/// Partially update a document (merge fields).
999pub async fn patch_document(
1000    State(state): State<AppState>,
1001    Path((collection, id)): Path<(String, String)>,
1002    Json(request): Json<UpdateDocumentRequest>,
1003) -> impl IntoResponse {
1004    state
1005        .activity
1006        .log_write(&format!("Patch document: {}/{}", collection, id), None);
1007
1008    let doc_id = DocumentId::new(&id);
1009
1010    // First get the existing document
1011    let existing = match state.document_engine.get(&collection, &doc_id) {
1012        Ok(Some(doc)) => doc,
1013        Ok(None) => {
1014            return (
1015                StatusCode::NOT_FOUND,
1016                Json(serde_json::json!({"success": false, "error": "Document not found"})),
1017            );
1018        }
1019        Err(e) => {
1020            return (
1021                StatusCode::BAD_REQUEST,
1022                Json(serde_json::json!({"success": false, "error": e.to_string()})),
1023            );
1024        }
1025    };
1026
1027    // Merge the patch into the existing document
1028    let mut updated_doc = existing.clone();
1029    if let serde_json::Value::Object(patch_map) = request.document {
1030        for (key, value) in patch_map {
1031            updated_doc.set(&key, json_to_doc_value(value));
1032        }
1033    }
1034
1035    match state
1036        .document_engine
1037        .update(&collection, &doc_id, updated_doc)
1038    {
1039        Ok(()) => {
1040            state.flush_collection(&collection);
1041            // Fetch the updated document to return it
1042            match state.document_engine.get(&collection, &doc_id) {
1043                Ok(Some(final_doc)) => {
1044                    let response = DocumentResponse {
1045                        id: final_doc.id.to_string(),
1046                        collection: collection.clone(),
1047                        data: doc_to_json(&final_doc),
1048                    };
1049                    (
1050                        StatusCode::OK,
1051                        Json(serde_json::json!({"success": true, "document": response})),
1052                    )
1053                }
1054                _ => (
1055                    StatusCode::OK,
1056                    Json(serde_json::json!({"success": true, "id": id})),
1057                ),
1058            }
1059        }
1060        Err(e) => (
1061            StatusCode::INTERNAL_SERVER_ERROR,
1062            Json(serde_json::json!({"success": false, "error": e.to_string()})),
1063        ),
1064    }
1065}
1066
1067/// Create collection request.
1068#[derive(Debug, Deserialize)]
1069pub struct CreateCollectionRequest {
1070    pub name: String,
1071}
1072
1073/// Create a new collection.
1074pub async fn create_collection(
1075    State(state): State<AppState>,
1076    Json(request): Json<CreateCollectionRequest>,
1077) -> impl IntoResponse {
1078    state
1079        .activity
1080        .log_write(&format!("Create collection: {}", request.name), None);
1081
1082    match state.document_engine.create_collection(&request.name) {
1083        Ok(()) => (
1084            StatusCode::CREATED,
1085            Json(serde_json::json!({"success": true, "collection": request.name})),
1086        ),
1087        Err(e) => (
1088            StatusCode::BAD_REQUEST,
1089            Json(serde_json::json!({"success": false, "error": e.to_string()})),
1090        ),
1091    }
1092}
1093
1094/// Insert document request.
1095#[derive(Debug, Deserialize)]
1096pub struct InsertDocumentRequest {
1097    /// Optional explicit document ID (takes precedence over _id in document)
1098    pub id: Option<String>,
1099    pub document: serde_json::Value,
1100}
1101
1102/// Insert a document into a collection.
1103pub async fn insert_document(
1104    State(state): State<AppState>,
1105    Path(collection): Path<String>,
1106    Json(request): Json<InsertDocumentRequest>,
1107) -> impl IntoResponse {
1108    state
1109        .activity
1110        .log_write(&format!("Insert document into: {}", collection), None);
1111
1112    // If id is provided at top level, inject it into the document
1113    let doc_json = if let Some(id) = request.id {
1114        let mut doc = request.document;
1115        if let serde_json::Value::Object(ref mut map) = doc {
1116            map.insert("_id".to_string(), serde_json::Value::String(id));
1117        }
1118        doc
1119    } else {
1120        request.document
1121    };
1122
1123    let doc = json_to_doc(doc_json);
1124    match state.document_engine.insert(&collection, doc) {
1125        Ok(id) => {
1126            state.flush_collection(&collection);
1127            (
1128                StatusCode::CREATED,
1129                Json(serde_json::json!({"success": true, "id": id.to_string()})),
1130            )
1131        }
1132        Err(e) => (
1133            StatusCode::BAD_REQUEST,
1134            Json(serde_json::json!({"success": false, "error": e.to_string()})),
1135        ),
1136    }
1137}
1138
1139/// Helper to convert Document to JSON.
1140fn doc_to_json(doc: &Document) -> serde_json::Value {
1141    let mut map = serde_json::Map::new();
1142    map.insert(
1143        "_id".to_string(),
1144        serde_json::Value::String(doc.id.to_string()),
1145    );
1146    // Add document fields
1147    for (key, value) in &doc.data {
1148        map.insert(key.clone(), aegis_doc_value_to_json(value));
1149    }
1150    serde_json::Value::Object(map)
1151}
1152
1153/// Helper to convert aegis_document::Value to JSON.
1154fn aegis_doc_value_to_json(value: &aegis_document::Value) -> serde_json::Value {
1155    match value {
1156        aegis_document::Value::Null => serde_json::Value::Null,
1157        aegis_document::Value::Bool(b) => serde_json::Value::Bool(*b),
1158        aegis_document::Value::Int(i) => serde_json::Value::Number((*i).into()),
1159        aegis_document::Value::Float(f) => serde_json::Number::from_f64(*f)
1160            .map(serde_json::Value::Number)
1161            .unwrap_or(serde_json::Value::Null),
1162        aegis_document::Value::String(s) => serde_json::Value::String(s.clone()),
1163        aegis_document::Value::Array(arr) => {
1164            serde_json::Value::Array(arr.iter().map(aegis_doc_value_to_json).collect())
1165        }
1166        aegis_document::Value::Object(obj) => {
1167            let map: serde_json::Map<String, serde_json::Value> = obj
1168                .iter()
1169                .map(|(k, v)| (k.clone(), aegis_doc_value_to_json(v)))
1170                .collect();
1171            serde_json::Value::Object(map)
1172        }
1173    }
1174}
1175
1176/// Helper to convert JSON to Document.
1177fn json_to_doc(json: serde_json::Value) -> Document {
1178    // Check for _id or id field to use as document ID
1179    // Priority: _id > id
1180    let doc_id = json
1181        .get("_id")
1182        .or_else(|| json.get("id"))
1183        .and_then(|v| v.as_str());
1184
1185    let mut doc = match doc_id {
1186        Some(id) => Document::with_id(id),
1187        None => Document::new(),
1188    };
1189
1190    if let serde_json::Value::Object(map) = json {
1191        for (key, value) in map {
1192            // Only skip _id (internal ID field), preserve all other fields including "id"
1193            if key != "_id" {
1194                doc.set(&key, json_to_doc_value(value));
1195            }
1196        }
1197    }
1198    doc
1199}
1200
1201/// Helper to convert JSON to aegis_document::Value.
1202fn json_to_doc_value(json: serde_json::Value) -> aegis_document::Value {
1203    match json {
1204        serde_json::Value::Null => aegis_document::Value::Null,
1205        serde_json::Value::Bool(b) => aegis_document::Value::Bool(b),
1206        serde_json::Value::Number(n) => {
1207            if let Some(i) = n.as_i64() {
1208                aegis_document::Value::Int(i)
1209            } else if let Some(f) = n.as_f64() {
1210                aegis_document::Value::Float(f)
1211            } else {
1212                aegis_document::Value::Null
1213            }
1214        }
1215        serde_json::Value::String(s) => aegis_document::Value::String(s),
1216        serde_json::Value::Array(arr) => {
1217            aegis_document::Value::Array(arr.into_iter().map(json_to_doc_value).collect())
1218        }
1219        serde_json::Value::Object(map) => aegis_document::Value::Object(
1220            map.into_iter()
1221                .map(|(k, v)| (k, json_to_doc_value(v)))
1222                .collect(),
1223        ),
1224    }
1225}
1226
1227/// List documents in a collection (GET /collections/:name/documents).
1228pub async fn list_collection_documents(
1229    State(state): State<AppState>,
1230    Path(collection): Path<String>,
1231    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1232) -> impl IntoResponse {
1233    state.activity.log(
1234        ActivityType::Query,
1235        &format!("List documents in: {}", collection),
1236    );
1237
1238    let limit = params.get("limit").and_then(|s| s.parse().ok());
1239    let skip = params.get("skip").and_then(|s| s.parse().ok());
1240
1241    let mut query = DocQuery::new();
1242    if let Some(limit) = limit {
1243        query = query.with_limit(limit);
1244    }
1245    if let Some(skip) = skip {
1246        query = query.with_skip(skip);
1247    }
1248
1249    match state.document_engine.find(&collection, &query) {
1250        Ok(result) => {
1251            let docs: Vec<DocumentResponse> = result
1252                .documents
1253                .iter()
1254                .map(|doc| DocumentResponse {
1255                    id: doc.id.to_string(),
1256                    collection: collection.clone(),
1257                    data: doc_to_json(doc),
1258                })
1259                .collect();
1260            let response = CollectionQueryResponse {
1261                documents: docs,
1262                total_scanned: result.total_scanned,
1263                execution_time_ms: result.execution_time_ms,
1264            };
1265            (StatusCode::OK, Json(response))
1266        }
1267        Err(_e) => {
1268            let empty = CollectionQueryResponse {
1269                documents: vec![],
1270                total_scanned: 0,
1271                execution_time_ms: 0,
1272            };
1273            (StatusCode::NOT_FOUND, Json(empty))
1274        }
1275    }
1276}
1277
1278/// Document query request with MongoDB-style filter operators.
1279#[derive(Debug, Deserialize)]
1280pub struct DocumentQueryRequest {
1281    #[serde(default)]
1282    pub filter: serde_json::Value,
1283    pub limit: Option<usize>,
1284    pub skip: Option<usize>,
1285    pub sort: Option<SortSpec>,
1286}
1287
1288/// Sort specification for queries.
1289#[derive(Debug, Deserialize)]
1290pub struct SortSpec {
1291    pub field: String,
1292    #[serde(default = "default_ascending")]
1293    pub ascending: bool,
1294}
1295
1296fn default_ascending() -> bool {
1297    true
1298}
1299
1300/// Query documents with filter operators (POST /collections/:name/query).
1301/// Supports MongoDB-style operators: $eq, $ne, $gt, $gte, $lt, $lte, $in, $nin, $exists, $regex, $and, $or
1302pub async fn query_collection_documents(
1303    State(state): State<AppState>,
1304    Path(collection): Path<String>,
1305    Json(request): Json<DocumentQueryRequest>,
1306) -> impl IntoResponse {
1307    state.activity.log(
1308        ActivityType::Query,
1309        &format!("Query collection: {}", collection),
1310    );
1311
1312    // Parse the filter into Query filters
1313    let mut query = DocQuery::new();
1314
1315    if let serde_json::Value::Object(filter_map) = &request.filter {
1316        for (field, condition) in filter_map {
1317            if let Some(filter) = parse_filter_condition(field, condition) {
1318                query = query.with_filter(filter);
1319            }
1320        }
1321    }
1322
1323    if let Some(limit) = request.limit {
1324        query = query.with_limit(limit);
1325    }
1326    if let Some(skip) = request.skip {
1327        query = query.with_skip(skip);
1328    }
1329    if let Some(ref sort) = request.sort {
1330        query = query.with_sort(&sort.field, sort.ascending);
1331    }
1332
1333    match state.document_engine.find(&collection, &query) {
1334        Ok(result) => {
1335            let docs: Vec<DocumentResponse> = result
1336                .documents
1337                .iter()
1338                .map(|doc| DocumentResponse {
1339                    id: doc.id.to_string(),
1340                    collection: collection.clone(),
1341                    data: doc_to_json(doc),
1342                })
1343                .collect();
1344            let response = CollectionQueryResponse {
1345                documents: docs,
1346                total_scanned: result.total_scanned,
1347                execution_time_ms: result.execution_time_ms,
1348            };
1349            (StatusCode::OK, Json(response))
1350        }
1351        Err(_) => {
1352            let empty = CollectionQueryResponse {
1353                documents: vec![],
1354                total_scanned: 0,
1355                execution_time_ms: 0,
1356            };
1357            (StatusCode::NOT_FOUND, Json(empty))
1358        }
1359    }
1360}
1361
1362/// Parse a filter condition with MongoDB-style operators.
1363fn parse_filter_condition(
1364    field: &str,
1365    condition: &serde_json::Value,
1366) -> Option<aegis_document::query::Filter> {
1367    use aegis_document::query::Filter;
1368
1369    match condition {
1370        // Direct value comparison (implicit $eq)
1371        serde_json::Value::Null
1372        | serde_json::Value::Bool(_)
1373        | serde_json::Value::Number(_)
1374        | serde_json::Value::String(_) => Some(Filter::Eq {
1375            field: field.to_string(),
1376            value: json_to_doc_value(condition.clone()),
1377        }),
1378        // Operator object
1379        serde_json::Value::Object(ops) => {
1380            // Handle $and and $or at the top level
1381            if field == "$and" {
1382                if let serde_json::Value::Array(arr) = condition {
1383                    let filters: Vec<Filter> = arr
1384                        .iter()
1385                        .filter_map(|item| {
1386                            if let serde_json::Value::Object(obj) = item {
1387                                obj.iter()
1388                                    .filter_map(|(k, v)| parse_filter_condition(k, v))
1389                                    .next()
1390                            } else {
1391                                None
1392                            }
1393                        })
1394                        .collect();
1395                    return Some(Filter::And(filters));
1396                }
1397                return None;
1398            }
1399            if field == "$or" {
1400                if let serde_json::Value::Array(arr) = condition {
1401                    let filters: Vec<Filter> = arr
1402                        .iter()
1403                        .filter_map(|item| {
1404                            if let serde_json::Value::Object(obj) = item {
1405                                obj.iter()
1406                                    .filter_map(|(k, v)| parse_filter_condition(k, v))
1407                                    .next()
1408                            } else {
1409                                None
1410                            }
1411                        })
1412                        .collect();
1413                    return Some(Filter::Or(filters));
1414                }
1415                return None;
1416            }
1417
1418            // Single operator or multiple operators on same field
1419            let mut filters: Vec<Filter> = Vec::new();
1420
1421            for (op, value) in ops {
1422                let filter = match op.as_str() {
1423                    "$eq" => Some(Filter::Eq {
1424                        field: field.to_string(),
1425                        value: json_to_doc_value(value.clone()),
1426                    }),
1427                    "$ne" => Some(Filter::Ne {
1428                        field: field.to_string(),
1429                        value: json_to_doc_value(value.clone()),
1430                    }),
1431                    "$gt" => Some(Filter::Gt {
1432                        field: field.to_string(),
1433                        value: json_to_doc_value(value.clone()),
1434                    }),
1435                    "$gte" => Some(Filter::Gte {
1436                        field: field.to_string(),
1437                        value: json_to_doc_value(value.clone()),
1438                    }),
1439                    "$lt" => Some(Filter::Lt {
1440                        field: field.to_string(),
1441                        value: json_to_doc_value(value.clone()),
1442                    }),
1443                    "$lte" => Some(Filter::Lte {
1444                        field: field.to_string(),
1445                        value: json_to_doc_value(value.clone()),
1446                    }),
1447                    "$in" => {
1448                        if let serde_json::Value::Array(arr) = value {
1449                            Some(Filter::In {
1450                                field: field.to_string(),
1451                                values: arr.iter().map(|v| json_to_doc_value(v.clone())).collect(),
1452                            })
1453                        } else {
1454                            None
1455                        }
1456                    }
1457                    "$nin" => {
1458                        if let serde_json::Value::Array(arr) = value {
1459                            Some(Filter::Nin {
1460                                field: field.to_string(),
1461                                values: arr.iter().map(|v| json_to_doc_value(v.clone())).collect(),
1462                            })
1463                        } else {
1464                            None
1465                        }
1466                    }
1467                    "$exists" => {
1468                        if let serde_json::Value::Bool(b) = value {
1469                            Some(Filter::Exists {
1470                                field: field.to_string(),
1471                                exists: *b,
1472                            })
1473                        } else {
1474                            None
1475                        }
1476                    }
1477                    "$regex" => {
1478                        if let serde_json::Value::String(pattern) = value {
1479                            Some(Filter::Regex {
1480                                field: field.to_string(),
1481                                pattern: pattern.clone(),
1482                            })
1483                        } else {
1484                            None
1485                        }
1486                    }
1487                    "$contains" => {
1488                        if let serde_json::Value::String(s) = value {
1489                            Some(Filter::Contains {
1490                                field: field.to_string(),
1491                                value: s.clone(),
1492                            })
1493                        } else {
1494                            None
1495                        }
1496                    }
1497                    "$startsWith" => {
1498                        if let serde_json::Value::String(s) = value {
1499                            Some(Filter::StartsWith {
1500                                field: field.to_string(),
1501                                value: s.clone(),
1502                            })
1503                        } else {
1504                            None
1505                        }
1506                    }
1507                    "$endsWith" => {
1508                        if let serde_json::Value::String(s) = value {
1509                            Some(Filter::EndsWith {
1510                                field: field.to_string(),
1511                                value: s.clone(),
1512                            })
1513                        } else {
1514                            None
1515                        }
1516                    }
1517                    _ => None,
1518                };
1519
1520                if let Some(f) = filter {
1521                    filters.push(f);
1522                }
1523            }
1524
1525            // If multiple operators on same field, combine with AND
1526            match filters.len() {
1527                0 => None,
1528                1 => filters.into_iter().next(),
1529                _ => Some(Filter::And(filters)),
1530            }
1531        }
1532        serde_json::Value::Array(_) => None,
1533    }
1534}
1535
1536// =============================================================================
1537// Time Series Endpoints (REAL IMPLEMENTATION)
1538// =============================================================================
1539
1540/// Register metric request.
1541#[derive(Debug, Deserialize)]
1542pub struct RegisterMetricRequest {
1543    pub name: String,
1544    #[serde(default = "default_metric_type")]
1545    pub metric_type: String,
1546    pub description: Option<String>,
1547    pub unit: Option<String>,
1548}
1549
1550fn default_metric_type() -> String {
1551    "gauge".to_string()
1552}
1553
1554/// Register a new metric with type information.
1555pub async fn register_metric(
1556    State(state): State<AppState>,
1557    Json(request): Json<RegisterMetricRequest>,
1558) -> impl IntoResponse {
1559    state
1560        .activity
1561        .log_write(&format!("Register metric: {}", request.name), None);
1562
1563    let metric_type = match request.metric_type.to_lowercase().as_str() {
1564        "counter" => MetricType::Counter,
1565        "gauge" => MetricType::Gauge,
1566        "histogram" => MetricType::Histogram,
1567        "summary" => MetricType::Summary,
1568        _ => MetricType::Gauge,
1569    };
1570
1571    let mut metric = Metric::new(&request.name);
1572    metric.metric_type = metric_type;
1573    metric.description = request.description;
1574    metric.unit = request.unit;
1575
1576    match state.timeseries_engine.register_metric(metric) {
1577        Ok(()) => (
1578            StatusCode::CREATED,
1579            Json(serde_json::json!({
1580                "success": true,
1581                "metric": request.name
1582            })),
1583        ),
1584        Err(e) => (
1585            StatusCode::BAD_REQUEST,
1586            Json(serde_json::json!({
1587                "success": false,
1588                "error": e.to_string()
1589            })),
1590        ),
1591    }
1592}
1593
1594/// Write time series data request.
1595#[derive(Debug, Deserialize)]
1596pub struct WriteTimeSeriesRequest {
1597    pub metric: String,
1598    #[serde(default)]
1599    pub tags: std::collections::HashMap<String, String>,
1600    pub value: f64,
1601    pub timestamp: Option<i64>,
1602}
1603
1604/// Write time series data.
1605pub async fn write_timeseries(
1606    State(state): State<AppState>,
1607    Json(request): Json<WriteTimeSeriesRequest>,
1608) -> impl IntoResponse {
1609    state
1610        .activity
1611        .log_write(&format!("Write timeseries: {}", request.metric), None);
1612
1613    let mut tags = Tags::new();
1614    for (k, v) in request.tags {
1615        tags.insert(&k, &v);
1616    }
1617
1618    let point = if let Some(ts) = request.timestamp {
1619        DataPoint {
1620            timestamp: chrono::DateTime::from_timestamp(ts, 0).unwrap_or_else(Utc::now),
1621            value: request.value,
1622        }
1623    } else {
1624        DataPoint {
1625            timestamp: Utc::now(),
1626            value: request.value,
1627        }
1628    };
1629
1630    match state.timeseries_engine.write(&request.metric, tags, point) {
1631        Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
1632        Err(e) => (
1633            StatusCode::BAD_REQUEST,
1634            Json(serde_json::json!({"success": false, "error": e.to_string()})),
1635        ),
1636    }
1637}
1638
1639/// Query time series request.
1640#[derive(Debug, Deserialize)]
1641pub struct QueryTimeSeriesRequest {
1642    pub metric: String,
1643    #[serde(default)]
1644    pub tags: Option<std::collections::HashMap<String, String>>,
1645    pub start: Option<i64>,
1646    pub end: Option<i64>,
1647    pub limit: Option<usize>,
1648}
1649
1650/// Time series data response.
1651#[derive(Debug, Serialize)]
1652pub struct TimeSeriesResponse {
1653    pub metric: String,
1654    pub series: Vec<SeriesResponse>,
1655    pub points_returned: usize,
1656    pub query_time_ms: u64,
1657}
1658
1659#[derive(Debug, Serialize)]
1660pub struct SeriesResponse {
1661    pub tags: std::collections::HashMap<String, String>,
1662    pub points: Vec<PointResponse>,
1663}
1664
1665#[derive(Debug, Serialize)]
1666pub struct PointResponse {
1667    pub timestamp: i64,
1668    pub value: f64,
1669}
1670
1671/// Query time series data.
1672pub async fn query_timeseries(
1673    State(state): State<AppState>,
1674    Json(request): Json<QueryTimeSeriesRequest>,
1675) -> impl IntoResponse {
1676    state.activity.log(
1677        ActivityType::Query,
1678        &format!("Query timeseries: {}", request.metric),
1679    );
1680
1681    let duration = Duration::hours(24); // Default 24h lookback
1682    let mut query = TimeSeriesQuery::last(&request.metric, duration);
1683
1684    if let Some(limit) = request.limit {
1685        query = query.with_limit(limit);
1686    }
1687
1688    if let Some(ref tags_map) = request.tags {
1689        let mut tags = Tags::new();
1690        for (k, v) in tags_map {
1691            tags.insert(k, v);
1692        }
1693        query = query.with_tags(tags);
1694    }
1695
1696    let result = state.timeseries_engine.query(&query);
1697
1698    let series: Vec<SeriesResponse> = result
1699        .series
1700        .iter()
1701        .map(|s| SeriesResponse {
1702            tags: s.tags.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
1703            points: s
1704                .points
1705                .iter()
1706                .map(|p| PointResponse {
1707                    timestamp: p.timestamp.timestamp(),
1708                    value: p.value,
1709                })
1710                .collect(),
1711        })
1712        .collect();
1713
1714    let response = TimeSeriesResponse {
1715        metric: request.metric,
1716        series,
1717        points_returned: result.points_returned,
1718        query_time_ms: result.query_time_ms,
1719    };
1720
1721    (StatusCode::OK, Json(response))
1722}
1723
1724/// Metric info response with full type information.
1725#[derive(Debug, Serialize)]
1726pub struct MetricInfoResponse {
1727    pub name: String,
1728    pub metric_type: String,
1729    pub description: Option<String>,
1730    pub unit: Option<String>,
1731}
1732
1733impl From<&Metric> for MetricInfoResponse {
1734    fn from(m: &Metric) -> Self {
1735        Self {
1736            name: m.name.clone(),
1737            metric_type: match m.metric_type {
1738                MetricType::Counter => "counter".to_string(),
1739                MetricType::Gauge => "gauge".to_string(),
1740                MetricType::Histogram => "histogram".to_string(),
1741                MetricType::Summary => "summary".to_string(),
1742            },
1743            description: m.description.clone(),
1744            unit: m.unit.clone(),
1745        }
1746    }
1747}
1748
1749/// List metrics with full type information.
1750pub async fn list_metrics(State(state): State<AppState>) -> Json<Vec<MetricInfoResponse>> {
1751    state.activity.log(ActivityType::Query, "Listed metrics");
1752    let metrics = state.timeseries_engine.list_metrics();
1753    Json(metrics.iter().map(MetricInfoResponse::from).collect())
1754}
1755
1756// =============================================================================
1757// Streaming Endpoints (REAL IMPLEMENTATION)
1758// =============================================================================
1759
1760/// Create channel request.
1761#[derive(Debug, Deserialize)]
1762pub struct CreateChannelRequest {
1763    pub id: String,
1764}
1765
1766/// Create a streaming channel.
1767pub async fn create_channel(
1768    State(state): State<AppState>,
1769    Json(request): Json<CreateChannelRequest>,
1770) -> impl IntoResponse {
1771    state
1772        .activity
1773        .log_write(&format!("Create channel: {}", request.id), None);
1774
1775    match state.streaming_engine.create_channel(request.id.clone()) {
1776        Ok(()) => (
1777            StatusCode::CREATED,
1778            Json(serde_json::json!({"success": true, "channel": request.id})),
1779        ),
1780        Err(e) => (
1781            StatusCode::BAD_REQUEST,
1782            Json(serde_json::json!({"success": false, "error": e.to_string()})),
1783        ),
1784    }
1785}
1786
1787/// List channels.
1788pub async fn list_channels(State(state): State<AppState>) -> Json<Vec<String>> {
1789    state.activity.log(ActivityType::Query, "Listed channels");
1790    let channels: Vec<String> = state
1791        .streaming_engine
1792        .list_channels()
1793        .into_iter()
1794        .map(|c| c.to_string())
1795        .collect();
1796    Json(channels)
1797}
1798
1799/// Publish event request.
1800#[derive(Debug, Deserialize)]
1801pub struct PublishEventRequest {
1802    pub channel: String,
1803    pub event_type: String,
1804    pub source: String,
1805    pub data: serde_json::Value,
1806}
1807
1808/// Publish an event to a channel.
1809pub async fn publish_event(
1810    State(state): State<AppState>,
1811    Json(request): Json<PublishEventRequest>,
1812) -> impl IntoResponse {
1813    state
1814        .activity
1815        .log_write(&format!("Publish to channel: {}", request.channel), None);
1816
1817    let event_type = match request.event_type.as_str() {
1818        "created" => StreamEventType::Created,
1819        "updated" => StreamEventType::Updated,
1820        "deleted" => StreamEventType::Deleted,
1821        _ => StreamEventType::Custom(request.event_type.clone()),
1822    };
1823
1824    let data = match request.data {
1825        serde_json::Value::String(s) => EventData::String(s),
1826        serde_json::Value::Number(n) => {
1827            if let Some(i) = n.as_i64() {
1828                EventData::Int(i)
1829            } else if let Some(f) = n.as_f64() {
1830                EventData::Float(f)
1831            } else {
1832                EventData::Null
1833            }
1834        }
1835        serde_json::Value::Bool(b) => EventData::Bool(b),
1836        serde_json::Value::Null => EventData::Null,
1837        _ => EventData::Json(request.data.clone()),
1838    };
1839
1840    let event = Event::new(event_type, &request.source, data);
1841    let channel_id = ChannelId::new(&request.channel);
1842
1843    match state.streaming_engine.publish(&channel_id, event) {
1844        Ok(receivers) => (
1845            StatusCode::OK,
1846            Json(serde_json::json!({"success": true, "receivers": receivers})),
1847        ),
1848        Err(e) => (
1849            StatusCode::BAD_REQUEST,
1850            Json(serde_json::json!({"success": false, "error": e.to_string()})),
1851        ),
1852    }
1853}
1854
1855/// Get channel history.
1856pub async fn get_channel_history(
1857    State(state): State<AppState>,
1858    Path(channel): Path<String>,
1859    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
1860) -> impl IntoResponse {
1861    let count = params
1862        .get("count")
1863        .and_then(|s| s.parse().ok())
1864        .unwrap_or(100);
1865    let channel_id = ChannelId::new(&channel);
1866
1867    match state.streaming_engine.get_history(&channel_id, count) {
1868        Ok(events) => {
1869            let event_data: Vec<serde_json::Value> = events
1870                .iter()
1871                .map(|e| {
1872                    serde_json::json!({
1873                        "id": e.id.to_string(),
1874                        "event_type": format!("{:?}", e.event_type),
1875                        "source": e.source,
1876                        "timestamp": e.timestamp,
1877                    })
1878                })
1879                .collect();
1880            (
1881                StatusCode::OK,
1882                Json(serde_json::json!({"events": event_data})),
1883            )
1884        }
1885        Err(e) => (
1886            StatusCode::NOT_FOUND,
1887            Json(serde_json::json!({"error": e.to_string()})),
1888        ),
1889    }
1890}
1891
1892// =============================================================================
1893// Graph Database Endpoints
1894// =============================================================================
1895
1896/// Graph data response (uses GraphNode and GraphEdge from state module).
1897#[derive(Debug, Serialize)]
1898pub struct GraphDataResponse {
1899    pub nodes: Vec<GraphNode>,
1900    pub edges: Vec<GraphEdge>,
1901}
1902
1903/// Get graph data.
1904pub async fn get_graph_data(State(state): State<AppState>) -> Json<GraphDataResponse> {
1905    state.activity.log(ActivityType::Query, "Query graph data");
1906
1907    let (nodes, edges) = state.graph_store.get_all();
1908
1909    Json(GraphDataResponse { nodes, edges })
1910}
1911
1912// =============================================================================
1913// Query Builder Endpoints
1914// =============================================================================
1915
1916/// Query execution request.
1917#[derive(Debug, Deserialize)]
1918pub struct ExecuteQueryRequest {
1919    pub query: String,
1920    #[serde(default)]
1921    pub database: Option<String>,
1922}
1923
1924/// Query execution response.
1925#[derive(Debug, Serialize)]
1926pub struct ExecuteQueryResponse {
1927    pub success: bool,
1928    pub columns: Vec<String>,
1929    pub rows: Vec<Vec<serde_json::Value>>,
1930    pub row_count: usize,
1931    pub execution_time_ms: u64,
1932    pub error: Option<String>,
1933}
1934
1935/// Execute a query from the query builder.
1936pub async fn execute_builder_query(
1937    State(state): State<AppState>,
1938    Json(request): Json<ExecuteQueryRequest>,
1939) -> Json<ExecuteQueryResponse> {
1940    let start = std::time::Instant::now();
1941    state.activity.log_query(&request.query, 0, None);
1942
1943    // Execute through the real query engine
1944    match state
1945        .query_engine
1946        .execute(&request.query, request.database.as_deref())
1947    {
1948        Ok(result) => Json(ExecuteQueryResponse {
1949            success: true,
1950            columns: result.columns,
1951            rows: result.rows,
1952            row_count: result.rows_affected as usize,
1953            execution_time_ms: start.elapsed().as_millis() as u64,
1954            error: None,
1955        }),
1956        Err(e) => Json(ExecuteQueryResponse {
1957            success: false,
1958            columns: vec![],
1959            rows: vec![],
1960            row_count: 0,
1961            execution_time_ms: start.elapsed().as_millis() as u64,
1962            error: Some(e.to_string()),
1963        }),
1964    }
1965}
1966
1967// =============================================================================
1968// Node Action Endpoints
1969// =============================================================================
1970
1971/// Generic action response.
1972#[derive(Debug, Serialize)]
1973pub struct NodeActionResponse {
1974    pub success: bool,
1975    pub message: String,
1976    pub node_id: String,
1977}
1978
1979/// Restart a node by sending a shutdown signal to the target.
1980/// PM2's autorestart will bring it back up.
1981pub async fn restart_node(
1982    State(state): State<AppState>,
1983    Path(node_id): Path<String>,
1984) -> Json<NodeActionResponse> {
1985    state
1986        .activity
1987        .log_node(&format!("Restarting node: {}", node_id));
1988
1989    // Find the peer's address
1990    let peers = state.admin.get_peers();
1991    let peer = peers
1992        .iter()
1993        .find(|p| p.id == node_id || p.name.as_deref() == Some(&node_id));
1994
1995    if let Some(peer) = peer {
1996        let address = peer.address.clone();
1997        // Send shutdown request to the target node asynchronously
1998        let client = reqwest::Client::builder()
1999            .timeout(std::time::Duration::from_secs(5))
2000            .build()
2001            .unwrap_or_default();
2002        let url = format!("{}/api/v1/cluster/shutdown", address);
2003        tokio::spawn(async move {
2004            let _ = client.post(&url).send().await;
2005        });
2006
2007        Json(NodeActionResponse {
2008            success: true,
2009            message: format!(
2010                "Node {} restart initiated at {}. PM2 will auto-restart.",
2011                node_id, address
2012            ),
2013            node_id,
2014        })
2015    } else {
2016        Json(NodeActionResponse {
2017            success: false,
2018            message: format!("Node {} not found in cluster peers.", node_id),
2019            node_id,
2020        })
2021    }
2022}
2023
2024/// Drain a node (mark as leaving, stop routing traffic).
2025pub async fn drain_node(
2026    State(state): State<AppState>,
2027    Path(node_id): Path<String>,
2028) -> Json<NodeActionResponse> {
2029    state
2030        .activity
2031        .log_node(&format!("Draining node: {}", node_id));
2032
2033    // Mark the node as leaving in the peer list
2034    let peers = state.admin.get_peers();
2035    let found = peers
2036        .iter()
2037        .any(|p| p.id == node_id || p.name.as_deref() == Some(&node_id));
2038
2039    if found {
2040        // Update peer status to Leaving so the router stops sending traffic
2041        state.admin.mark_peer_offline(&node_id);
2042
2043        Json(NodeActionResponse {
2044            success: true,
2045            message: format!(
2046                "Node {} marked as draining. Traffic will be redirected to other nodes.",
2047                node_id
2048            ),
2049            node_id,
2050        })
2051    } else {
2052        Json(NodeActionResponse {
2053            success: false,
2054            message: format!("Node {} not found in cluster peers.", node_id),
2055            node_id,
2056        })
2057    }
2058}
2059
2060/// Remove a node from the cluster.
2061pub async fn remove_node(
2062    State(state): State<AppState>,
2063    Path(node_id): Path<String>,
2064) -> impl IntoResponse {
2065    state
2066        .activity
2067        .log_node(&format!("Removing node from cluster: {}", node_id));
2068
2069    // Actually remove the peer from the admin service
2070    state.admin.remove_peer(&node_id);
2071
2072    (
2073        StatusCode::OK,
2074        Json(NodeActionResponse {
2075            success: true,
2076            message: format!("Node {} has been removed from the cluster.", node_id),
2077            node_id,
2078        }),
2079    )
2080}
2081
2082/// Graceful shutdown endpoint - called by restart_node on the target.
2083/// Flushes data and exits; PM2 auto-restarts the process.
2084pub async fn cluster_shutdown(State(state): State<AppState>) -> impl IntoResponse {
2085    state
2086        .activity
2087        .log_node("Graceful shutdown initiated via cluster API");
2088
2089    // Flush timeseries data
2090    state.timeseries_engine.flush();
2091
2092    // Give a brief moment for the response to be sent
2093    tokio::spawn(async {
2094        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2095        std::process::exit(0);
2096    });
2097
2098    (
2099        StatusCode::OK,
2100        Json(serde_json::json!({
2101            "status": "shutting_down",
2102            "message": "Node will restart via PM2"
2103        })),
2104    )
2105}
2106
2107/// Node logs entry.
2108#[derive(Debug, Serialize)]
2109pub struct NodeLogEntry {
2110    pub timestamp: String,
2111    pub level: String,
2112    pub message: String,
2113}
2114
2115/// Node logs response.
2116#[derive(Debug, Serialize)]
2117pub struct NodeLogsResponse {
2118    pub node_id: String,
2119    pub logs: Vec<NodeLogEntry>,
2120    pub total: usize,
2121}
2122
2123/// Get logs for a specific node.
2124pub async fn get_node_logs(
2125    State(state): State<AppState>,
2126    Path(node_id): Path<String>,
2127    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
2128) -> Json<NodeLogsResponse> {
2129    let limit: usize = params
2130        .get("limit")
2131        .and_then(|l| l.parse().ok())
2132        .unwrap_or(100);
2133
2134    // Get real activity logs from the server
2135    let activities = state.activity.get_recent(limit);
2136    let logs: Vec<NodeLogEntry> = activities
2137        .iter()
2138        .map(|a| NodeLogEntry {
2139            timestamp: a.timestamp.clone(),
2140            level: match a.activity_type {
2141                ActivityType::Auth | ActivityType::System => "INFO".to_string(),
2142                ActivityType::Write | ActivityType::Delete => "WARN".to_string(),
2143                ActivityType::Query | ActivityType::Config | ActivityType::Node => {
2144                    "INFO".to_string()
2145                }
2146            },
2147            message: a.description.clone(),
2148        })
2149        .collect();
2150
2151    let total = logs.len();
2152    Json(NodeLogsResponse {
2153        node_id,
2154        logs: logs.into_iter().take(limit).collect(),
2155        total,
2156    })
2157}
2158
2159// =============================================================================
2160// Settings Endpoints
2161// =============================================================================
2162
2163/// Server settings structure.
2164#[derive(Debug, Clone, Serialize, Deserialize)]
2165pub struct ServerSettings {
2166    pub replication_factor: u8,
2167    pub auto_backups_enabled: bool,
2168    pub backup_schedule: String,
2169    pub retention_days: u32,
2170    pub tls_enabled: bool,
2171    pub auth_required: bool,
2172    pub session_timeout_minutes: u32,
2173    pub require_2fa: bool,
2174    pub audit_logging_enabled: bool,
2175}
2176
2177impl Default for ServerSettings {
2178    fn default() -> Self {
2179        Self {
2180            replication_factor: 3,
2181            auto_backups_enabled: true,
2182            backup_schedule: "0 2 * * *".to_string(),
2183            retention_days: 30,
2184            tls_enabled: false,
2185            auth_required: true,
2186            session_timeout_minutes: 60,
2187            require_2fa: false,
2188            audit_logging_enabled: true,
2189        }
2190    }
2191}
2192
2193/// Get server settings.
2194pub async fn get_settings(State(state): State<AppState>) -> Json<ServerSettings> {
2195    state
2196        .activity
2197        .log(ActivityType::Config, "Retrieved server settings");
2198    let settings = state.settings.read().await;
2199    Json(settings.clone())
2200}
2201
2202/// Update server settings.
2203pub async fn update_settings(
2204    State(state): State<AppState>,
2205    Json(new_settings): Json<ServerSettings>,
2206) -> impl IntoResponse {
2207    state.activity.log_config("Updated server settings", None);
2208    let mut settings = state.settings.write().await;
2209    *settings = new_settings.clone();
2210    drop(settings);
2211    state.save_settings().await;
2212    (
2213        StatusCode::OK,
2214        Json(serde_json::json!({"success": true, "settings": new_settings})),
2215    )
2216}
2217
2218// =============================================================================
2219// User Management Endpoints
2220// =============================================================================
2221
2222/// User info response for list users.
2223#[derive(Debug, Clone, Serialize, Deserialize)]
2224pub struct UserListItem {
2225    pub id: String,
2226    pub username: String,
2227    pub email: String,
2228    pub role: String,
2229    pub mfa_enabled: bool,
2230    pub enabled: bool,
2231    pub created_at: String,
2232    pub last_login: Option<String>,
2233}
2234
2235/// List all users.
2236pub async fn list_users(State(state): State<AppState>) -> Json<Vec<UserListItem>> {
2237    state.activity.log(ActivityType::Query, "Listed users");
2238    let users = state.auth.list_users();
2239    let user_list: Vec<UserListItem> = users
2240        .iter()
2241        .map(|u| UserListItem {
2242            id: u.id.clone(),
2243            username: u.username.clone(),
2244            email: u.email.clone(),
2245            role: format!("{:?}", u.role).to_lowercase(),
2246            mfa_enabled: u.mfa_enabled,
2247            enabled: true,
2248            created_at: u.created_at.clone(),
2249            last_login: None,
2250        })
2251        .collect();
2252    Json(user_list)
2253}
2254
2255/// Create user request.
2256#[derive(Debug, Deserialize)]
2257pub struct CreateUserRequest {
2258    pub username: String,
2259    pub email: String,
2260    pub password: String,
2261    pub role: String,
2262}
2263
2264/// Create a new user.
2265pub async fn create_user(
2266    State(state): State<AppState>,
2267    Json(request): Json<CreateUserRequest>,
2268) -> impl IntoResponse {
2269    state
2270        .activity
2271        .log_write(&format!("Create user: {}", request.username), None);
2272
2273    match state.auth.create_user(
2274        &request.username,
2275        &request.email,
2276        &request.password,
2277        &request.role,
2278    ) {
2279        Ok(user) => (
2280            StatusCode::CREATED,
2281            Json(serde_json::json!({"success": true, "user": user})),
2282        ),
2283        Err(e) => (
2284            StatusCode::BAD_REQUEST,
2285            Json(serde_json::json!({"success": false, "error": e})),
2286        ),
2287    }
2288}
2289
2290/// Update user request.
2291#[derive(Debug, Deserialize)]
2292pub struct UpdateUserRequest {
2293    pub email: Option<String>,
2294    pub role: Option<String>,
2295    pub enabled: Option<bool>,
2296    pub password: Option<String>,
2297}
2298
2299/// Update a user.
2300pub async fn update_user(
2301    State(state): State<AppState>,
2302    Path(username): Path<String>,
2303    Json(request): Json<UpdateUserRequest>,
2304) -> impl IntoResponse {
2305    state
2306        .activity
2307        .log_write(&format!("Update user: {}", username), None);
2308
2309    match state
2310        .auth
2311        .update_user(&username, request.email, request.role, request.password)
2312    {
2313        Ok(user) => (
2314            StatusCode::OK,
2315            Json(serde_json::json!({"success": true, "user": user})),
2316        ),
2317        Err(e) => (
2318            StatusCode::NOT_FOUND,
2319            Json(serde_json::json!({"success": false, "error": e})),
2320        ),
2321    }
2322}
2323
2324/// Delete a user.
2325pub async fn delete_user(
2326    State(state): State<AppState>,
2327    Path(username): Path<String>,
2328) -> impl IntoResponse {
2329    state
2330        .activity
2331        .log(ActivityType::Delete, &format!("Delete user: {}", username));
2332
2333    match state.auth.delete_user(&username) {
2334        Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2335        Err(e) => (
2336            StatusCode::NOT_FOUND,
2337            Json(serde_json::json!({"success": false, "error": e})),
2338        ),
2339    }
2340}
2341
2342// =============================================================================
2343// Role Management Endpoints
2344// =============================================================================
2345
2346/// Role info for API responses.
2347#[derive(Debug, Clone, Serialize, Deserialize)]
2348pub struct RoleInfo {
2349    pub name: String,
2350    pub description: String,
2351    pub permissions: Vec<String>,
2352    pub created_at: String,
2353    pub is_builtin: bool,
2354}
2355
2356/// List all roles.
2357pub async fn list_roles(State(state): State<AppState>) -> Json<Vec<RoleInfo>> {
2358    state.activity.log(ActivityType::Query, "Listed roles");
2359    let roles = state.rbac.list_roles();
2360    let role_list: Vec<RoleInfo> = roles
2361        .iter()
2362        .map(|r| RoleInfo {
2363            name: r.name.clone(),
2364            description: r.description.clone(),
2365            permissions: r
2366                .permissions
2367                .iter()
2368                .map(|p| format!("{:?}", p).to_lowercase())
2369                .collect(),
2370            created_at: format_timestamp_ms(r.created_at),
2371            is_builtin: r.name == "admin"
2372                || r.name == "operator"
2373                || r.name == "viewer"
2374                || r.name == "analyst",
2375        })
2376        .collect();
2377    Json(role_list)
2378}
2379
2380/// Create role request.
2381#[derive(Debug, Deserialize)]
2382pub struct CreateRoleRequest {
2383    pub name: String,
2384    pub description: String,
2385    pub permissions: Vec<String>,
2386}
2387
2388/// Create a new role.
2389pub async fn create_role(
2390    State(state): State<AppState>,
2391    Json(request): Json<CreateRoleRequest>,
2392) -> impl IntoResponse {
2393    state
2394        .activity
2395        .log_write(&format!("Create role: {}", request.name), None);
2396
2397    // Parse permission strings into Permission enum
2398    let permissions = parse_permissions(&request.permissions);
2399
2400    match state
2401        .rbac
2402        .create_role(&request.name, &request.description, permissions, "admin")
2403    {
2404        Ok(()) => {
2405            let role = state.rbac.get_role(&request.name);
2406            (
2407                StatusCode::CREATED,
2408                Json(
2409                    serde_json::json!({"success": true, "role": request.name, "permissions": role.map(|r| r.permissions.len()).unwrap_or(0)}),
2410                ),
2411            )
2412        }
2413        Err(e) => (
2414            StatusCode::BAD_REQUEST,
2415            Json(serde_json::json!({"success": false, "error": e})),
2416        ),
2417    }
2418}
2419
2420/// Delete a role.
2421pub async fn delete_role(
2422    State(state): State<AppState>,
2423    Path(name): Path<String>,
2424) -> impl IntoResponse {
2425    state
2426        .activity
2427        .log(ActivityType::Delete, &format!("Delete role: {}", name));
2428
2429    match state.rbac.delete_role(&name) {
2430        Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2431        Err(e) => (
2432            StatusCode::BAD_REQUEST,
2433            Json(serde_json::json!({"success": false, "error": e})),
2434        ),
2435    }
2436}
2437
2438/// Parse permission strings to Permission enums.
2439fn parse_permissions(perms: &[String]) -> Vec<crate::auth::Permission> {
2440    use crate::auth::Permission;
2441    perms
2442        .iter()
2443        .filter_map(|p| match p.to_lowercase().as_str() {
2444            "database_create" | "databasecreate" => Some(Permission::DatabaseCreate),
2445            "database_drop" | "databasedrop" => Some(Permission::DatabaseDrop),
2446            "database_list" | "databaselist" => Some(Permission::DatabaseList),
2447            "table_create" | "tablecreate" => Some(Permission::TableCreate),
2448            "table_drop" | "tabledrop" => Some(Permission::TableDrop),
2449            "table_alter" | "tablealter" => Some(Permission::TableAlter),
2450            "table_list" | "tablelist" => Some(Permission::TableList),
2451            "data_select" | "dataselect" | "data:read" => Some(Permission::DataSelect),
2452            "data_insert" | "datainsert" | "data:write" => Some(Permission::DataInsert),
2453            "data_update" | "dataupdate" => Some(Permission::DataUpdate),
2454            "data_delete" | "datadelete" => Some(Permission::DataDelete),
2455            "user_create" | "usercreate" => Some(Permission::UserCreate),
2456            "user_delete" | "userdelete" => Some(Permission::UserDelete),
2457            "user_modify" | "usermodify" => Some(Permission::UserModify),
2458            "role_create" | "rolecreate" => Some(Permission::RoleCreate),
2459            "role_delete" | "roledelete" => Some(Permission::RoleDelete),
2460            "role_assign" | "roleassign" => Some(Permission::RoleAssign),
2461            "config_view" | "configview" => Some(Permission::ConfigView),
2462            "config_modify" | "configmodify" => Some(Permission::ConfigModify),
2463            "metrics_view" | "metricsview" => Some(Permission::MetricsView),
2464            "logs_view" | "logsview" => Some(Permission::LogsView),
2465            "backup_create" | "backupcreate" => Some(Permission::BackupCreate),
2466            "backup_restore" | "backuprestore" => Some(Permission::BackupRestore),
2467            "node_add" | "nodeadd" => Some(Permission::NodeAdd),
2468            "node_remove" | "noderemove" => Some(Permission::NodeRemove),
2469            "cluster_manage" | "clustermanage" => Some(Permission::ClusterManage),
2470            _ => None,
2471        })
2472        .collect()
2473}
2474
2475/// Format timestamp from milliseconds to ISO string.
2476fn format_timestamp_ms(timestamp_ms: u64) -> String {
2477    let secs = timestamp_ms / 1000;
2478    let datetime = std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs);
2479    let duration = datetime
2480        .duration_since(std::time::UNIX_EPOCH)
2481        .unwrap_or_default();
2482    let total_secs = duration.as_secs();
2483
2484    let days_since_epoch = total_secs / 86400;
2485    let secs_today = total_secs % 86400;
2486    let hours = secs_today / 3600;
2487    let minutes = (secs_today % 3600) / 60;
2488    let seconds = secs_today % 60;
2489
2490    let mut year = 1970u64;
2491    let mut remaining_days = days_since_epoch;
2492    loop {
2493        let days_in_year = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) {
2494            366
2495        } else {
2496            365
2497        };
2498        if remaining_days < days_in_year {
2499            break;
2500        }
2501        remaining_days -= days_in_year;
2502        year += 1;
2503    }
2504
2505    let days_in_months: [u64; 12] = if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) {
2506        [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
2507    } else {
2508        [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
2509    };
2510
2511    let mut month = 1u64;
2512    for &days in &days_in_months {
2513        if remaining_days < days {
2514            break;
2515        }
2516        remaining_days -= days;
2517        month += 1;
2518    }
2519    let day = remaining_days + 1;
2520
2521    format!(
2522        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
2523        year, month, day, hours, minutes, seconds
2524    )
2525}
2526
2527// =============================================================================
2528// Metrics Timeseries Endpoint
2529// =============================================================================
2530
2531/// Metrics timeseries request.
2532#[derive(Debug, Deserialize)]
2533pub struct MetricsTimeseriesRequest {
2534    pub time_range: String,
2535}
2536
2537/// Metrics data point.
2538#[derive(Debug, Clone, Serialize, Deserialize)]
2539pub struct MetricsDataPoint {
2540    pub timestamp: i64,
2541    pub cpu_percent: f64,
2542    pub memory_percent: f64,
2543    pub queries_per_second: f64,
2544    pub latency_ms: f64,
2545    pub connections: u64,
2546    pub bytes_in: u64,
2547    pub bytes_out: u64,
2548}
2549
2550/// Metrics timeseries response.
2551#[derive(Debug, Serialize)]
2552pub struct MetricsTimeseriesResponse {
2553    pub time_range: String,
2554    pub data_points: Vec<MetricsDataPoint>,
2555}
2556
2557/// Get metrics timeseries data.
2558pub async fn get_metrics_timeseries(
2559    State(state): State<AppState>,
2560    Json(request): Json<MetricsTimeseriesRequest>,
2561) -> Json<MetricsTimeseriesResponse> {
2562    state.activity.log(
2563        ActivityType::Query,
2564        &format!("Query metrics timeseries: {}", request.time_range),
2565    );
2566
2567    // Get time range in seconds
2568    let range_secs: i64 = match request.time_range.as_str() {
2569        "1h" => 3600,
2570        "6h" => 6 * 3600,
2571        "24h" => 24 * 3600,
2572        "7d" => 7 * 24 * 3600,
2573        "30d" => 30 * 24 * 3600,
2574        _ => 3600,
2575    };
2576
2577    // Get metrics history from state
2578    let history = state.metrics_history.read().await;
2579    let now = Utc::now().timestamp();
2580    let start_time = now - range_secs;
2581
2582    // Filter to requested time range
2583    let data_points: Vec<MetricsDataPoint> = history
2584        .iter()
2585        .filter(|p| p.timestamp >= start_time)
2586        .cloned()
2587        .collect();
2588
2589    Json(MetricsTimeseriesResponse {
2590        time_range: request.time_range,
2591        data_points,
2592    })
2593}
2594
2595// =============================================================================
2596// Graph Database Endpoints (Real Implementation)
2597// =============================================================================
2598
2599/// Create a graph node.
2600#[derive(Debug, Deserialize)]
2601pub struct CreateNodeRequest {
2602    pub label: String,
2603    pub properties: serde_json::Value,
2604}
2605
2606/// Create a graph edge.
2607#[derive(Debug, Deserialize)]
2608pub struct CreateEdgeRequest {
2609    pub source: String,
2610    pub target: String,
2611    pub relationship: String,
2612}
2613
2614/// Create a new graph node.
2615pub async fn create_graph_node(
2616    State(state): State<AppState>,
2617    Json(request): Json<CreateNodeRequest>,
2618) -> impl IntoResponse {
2619    state
2620        .activity
2621        .log_write(&format!("Create graph node: {}", request.label), None);
2622
2623    let node = state
2624        .graph_store
2625        .create_node(&request.label, request.properties);
2626    (
2627        StatusCode::CREATED,
2628        Json(serde_json::json!({"success": true, "node": node})),
2629    )
2630}
2631
2632/// Create a new graph edge.
2633pub async fn create_graph_edge(
2634    State(state): State<AppState>,
2635    Json(request): Json<CreateEdgeRequest>,
2636) -> impl IntoResponse {
2637    state.activity.log_write(
2638        &format!(
2639            "Create graph edge: {} -> {}",
2640            request.source, request.target
2641        ),
2642        None,
2643    );
2644
2645    match state
2646        .graph_store
2647        .create_edge(&request.source, &request.target, &request.relationship)
2648    {
2649        Ok(edge) => (
2650            StatusCode::CREATED,
2651            Json(serde_json::json!({"success": true, "edge": edge})),
2652        ),
2653        Err(e) => (
2654            StatusCode::BAD_REQUEST,
2655            Json(serde_json::json!({"success": false, "error": e})),
2656        ),
2657    }
2658}
2659
2660/// Delete a graph node.
2661pub async fn delete_graph_node(
2662    State(state): State<AppState>,
2663    Path(node_id): Path<String>,
2664) -> impl IntoResponse {
2665    state.activity.log(
2666        ActivityType::Delete,
2667        &format!("Delete graph node: {}", node_id),
2668    );
2669
2670    match state.graph_store.delete_node(&node_id) {
2671        Ok(()) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
2672        Err(e) => (
2673            StatusCode::NOT_FOUND,
2674            Json(serde_json::json!({"success": false, "error": e})),
2675        ),
2676    }
2677}
2678
2679// =============================================================================
2680// OTA Update Handlers
2681// =============================================================================
2682
2683/// Get version information for all cluster nodes.
2684pub async fn get_update_version(State(state): State<AppState>) -> impl IntoResponse {
2685    let version = aegis_updates::version::VERSION;
2686    let node_name = state
2687        .config
2688        .node_name
2689        .clone()
2690        .unwrap_or_else(|| "unknown".to_string());
2691
2692    (
2693        StatusCode::OK,
2694        Json(serde_json::json!({
2695            "success": true,
2696            "version": version,
2697            "node_id": state.config.node_id,
2698            "node_name": node_name,
2699        })),
2700    )
2701}
2702
2703/// Create an update plan.
2704#[derive(serde::Deserialize)]
2705pub struct CreateUpdatePlanRequest {
2706    pub version: String,
2707    pub binary_url: String,
2708    pub sha256: String,
2709}
2710
2711pub async fn create_update_plan(
2712    State(state): State<AppState>,
2713    Json(request): Json<CreateUpdatePlanRequest>,
2714) -> impl IntoResponse {
2715    state.activity.log_system(&format!(
2716        "Creating update plan for version {}",
2717        request.version
2718    ));
2719
2720    // Populate cluster nodes from admin service peers
2721    let peers = state.admin.get_peers();
2722    let mut nodes = vec![aegis_updates::orchestrator::ClusterNode {
2723        node_id: state.config.node_id.clone(),
2724        name: state
2725            .config
2726            .node_name
2727            .clone()
2728            .unwrap_or_else(|| "self".to_string()),
2729        address: format!("http://{}:{}", state.config.host, state.config.port),
2730        role: "leader".to_string(),
2731    }];
2732    for peer in &peers {
2733        nodes.push(aegis_updates::orchestrator::ClusterNode {
2734            node_id: peer.id.clone(),
2735            name: peer.name.clone().unwrap_or_else(|| peer.id.clone()),
2736            address: peer.address.clone(),
2737            role: "follower".to_string(),
2738        });
2739    }
2740    state.update_orchestrator.set_cluster_nodes(nodes).await;
2741
2742    let plan = state
2743        .update_orchestrator
2744        .create_plan(request.version, request.binary_url, request.sha256)
2745        .await;
2746
2747    (
2748        StatusCode::CREATED,
2749        Json(serde_json::json!({
2750            "success": true,
2751            "plan": plan,
2752        })),
2753    )
2754}
2755
2756/// Execute an update plan.
2757#[derive(serde::Deserialize)]
2758pub struct ExecuteUpdateRequest {
2759    pub plan_id: String,
2760}
2761
2762pub async fn execute_update_plan(
2763    State(state): State<AppState>,
2764    Json(request): Json<ExecuteUpdateRequest>,
2765) -> impl IntoResponse {
2766    state
2767        .activity
2768        .log_system(&format!("Executing update plan {}", request.plan_id));
2769
2770    match state
2771        .update_orchestrator
2772        .execute_plan(&request.plan_id)
2773        .await
2774    {
2775        Ok(()) => (
2776            StatusCode::OK,
2777            Json(serde_json::json!({
2778                "success": true,
2779                "message": "Update completed successfully",
2780            })),
2781        ),
2782        Err(e) => (
2783            StatusCode::INTERNAL_SERVER_ERROR,
2784            Json(serde_json::json!({
2785                "success": false,
2786                "error": e.to_string(),
2787            })),
2788        ),
2789    }
2790}
2791
2792/// Get update plan status.
2793pub async fn get_update_status(
2794    State(state): State<AppState>,
2795    Path(plan_id): Path<String>,
2796) -> impl IntoResponse {
2797    match state.update_orchestrator.get_plan(&plan_id).await {
2798        Some(plan) => (
2799            StatusCode::OK,
2800            Json(serde_json::json!({
2801                "success": true,
2802                "plan": plan,
2803            })),
2804        ),
2805        None => (
2806            StatusCode::NOT_FOUND,
2807            Json(serde_json::json!({
2808                "success": false,
2809                "error": format!("Plan {} not found", plan_id),
2810            })),
2811        ),
2812    }
2813}
2814
2815/// List all update plans (history).
2816pub async fn list_update_plans(State(state): State<AppState>) -> impl IntoResponse {
2817    let plans = state.update_orchestrator.list_plans().await;
2818
2819    (
2820        StatusCode::OK,
2821        Json(serde_json::json!({
2822            "success": true,
2823            "plans": plans,
2824        })),
2825    )
2826}