Skip to main content

aimdb_core/remote/
handler.rs

1//! Connection handler for AimX protocol
2//!
3//! Handles individual client connections, including handshake, authentication,
4//! and protocol method dispatch.
5//!
6//! # Architecture: Event Funnel Pattern
7//!
8//! Subscriptions use a funnel pattern for clean event delivery:
9//! - Each subscription spawns a consumer task that reads from the record buffer
10//! - Consumer tasks send events to a shared mpsc channel (the "funnel")
11//! - A single writer task drains the funnel and writes events to the UnixStream
12//! - This ensures NDJSON line integrity and prevents write interleaving
13
14use crate::remote::{
15    AimxConfig, Event, HelloMessage, RecordMetadata, Request, Response, WelcomeMessage,
16};
17use crate::{AimDb, DbError, DbResult};
18
19#[cfg(feature = "std")]
20use std::collections::HashMap;
21#[cfg(feature = "std")]
22use std::sync::Arc;
23
24#[cfg(feature = "std")]
25use serde_json::json;
26#[cfg(feature = "std")]
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
28#[cfg(feature = "std")]
29use tokio::net::UnixStream;
30#[cfg(feature = "std")]
31use tokio::sync::mpsc;
32#[cfg(feature = "std")]
33use tokio::sync::oneshot;
34
35/// Handle for an active subscription
36///
37/// Tracks the state needed to manage a single subscription's lifecycle.
38#[cfg(feature = "std")]
39#[allow(dead_code)] // record_name used only in tracing feature
40struct SubscriptionHandle {
41    /// Unique subscription identifier (returned to client)
42    subscription_id: String,
43
44    /// Record name being subscribed to
45    record_name: String,
46
47    /// Signal to cancel this subscription
48    /// When sent, the consumer task will terminate
49    cancel_tx: oneshot::Sender<()>,
50}
51
52/// Connection state for managing subscriptions
53///
54/// Tracks all active subscriptions for a single client connection.
55#[cfg(feature = "std")]
56struct ConnectionState {
57    /// Active subscriptions by subscription_id
58    subscriptions: HashMap<String, SubscriptionHandle>,
59
60    /// Counter for generating unique subscription IDs
61    next_subscription_id: u64,
62
63    /// Event funnel: all subscription tasks send events here
64    /// This channel feeds the single writer task
65    event_tx: mpsc::UnboundedSender<Event>,
66
67    /// Per-record drain readers, created lazily on first record.drain call.
68    /// One drain reader per record, per connection.
69    drain_readers: HashMap<String, Box<dyn crate::buffer::JsonBufferReader + Send>>,
70}
71
72#[cfg(feature = "std")]
73impl ConnectionState {
74    /// Creates a new connection state
75    fn new(event_tx: mpsc::UnboundedSender<Event>) -> Self {
76        Self {
77            subscriptions: HashMap::new(),
78            next_subscription_id: 1,
79            event_tx,
80            drain_readers: HashMap::new(),
81        }
82    }
83
84    /// Generates a unique subscription ID for this connection
85    fn generate_subscription_id(&mut self) -> String {
86        let id = format!("sub-{}", self.next_subscription_id);
87        self.next_subscription_id += 1;
88        id
89    }
90
91    /// Adds a subscription to the connection state
92    fn add_subscription(&mut self, handle: SubscriptionHandle) {
93        self.subscriptions
94            .insert(handle.subscription_id.clone(), handle);
95    }
96
97    /// Removes and returns a subscription by ID
98    #[allow(dead_code)]
99    fn remove_subscription(&mut self, subscription_id: &str) -> Option<SubscriptionHandle> {
100        self.subscriptions.remove(subscription_id)
101    }
102
103    /// Cancels all active subscriptions
104    ///
105    /// Sends cancel signals to all subscription tasks and clears the map.
106    /// Called when the client disconnects.
107    async fn cancel_all_subscriptions(&mut self) {
108        #[cfg(feature = "tracing")]
109        tracing::info!(
110            "Canceling {} active subscriptions",
111            self.subscriptions.len()
112        );
113
114        for (_id, handle) in self.subscriptions.drain() {
115            // Send cancel signal (ignore if receiver already dropped)
116            let _ = handle.cancel_tx.send(());
117        }
118    }
119}
120
121/// Handles an incoming client connection
122///
123/// Processes the AimX protocol handshake and manages the client session.
124/// Implements the event funnel pattern for subscription event delivery.
125///
126/// # Architecture
127///
128/// ```text
129/// ┌─────────────────┐
130/// │ Subscription 1  │───┐
131/// │ Consumer Task   │   │
132/// └─────────────────┘   │
133///                       ├──► Event Funnel ───► select! loop ───► UnixStream
134/// ┌─────────────────┐   │     (mpsc)          (interleaved    
135/// │ Subscription 2  │───┘                      writes)
136/// │ Consumer Task   │
137/// └─────────────────┘
138/// ```
139///
140/// The main loop uses `tokio::select!` to interleave:
141/// - Reading requests from the stream
142/// - Writing events from subscriptions
143///
144/// This ensures both responses and events are written without blocking.
145///
146/// # Arguments
147/// * `db` - Database instance
148/// * `config` - Remote access configuration
149/// * `stream` - Unix domain socket stream
150///
151/// # Errors
152/// Returns error if handshake fails or stream operations error
153#[cfg(feature = "std")]
154pub async fn handle_connection<R>(
155    db: Arc<AimDb<R>>,
156    config: AimxConfig,
157    stream: UnixStream,
158) -> DbResult<()>
159where
160    R: crate::RuntimeAdapter + crate::Spawn + 'static,
161{
162    #[cfg(feature = "tracing")]
163    tracing::info!("New remote access connection established");
164
165    // Perform protocol handshake
166    let mut stream = match perform_handshake(stream, &config, &db).await {
167        Ok(stream) => stream,
168        Err(e) => {
169            #[cfg(feature = "tracing")]
170            tracing::warn!("Handshake failed: {}", e);
171            return Err(e);
172        }
173    };
174
175    #[cfg(feature = "tracing")]
176    tracing::info!("Handshake complete, client ready");
177
178    // Create event funnel: all subscription tasks will send events here
179    let (event_tx, mut event_rx) = mpsc::unbounded_channel::<Event>();
180
181    // Initialize connection state
182    let mut conn_state = ConnectionState::new(event_tx);
183
184    // Main loop: interleave reading requests and writing events
185    loop {
186        let mut line = String::new();
187
188        tokio::select! {
189            // Handle incoming requests
190            read_result = stream.read_line(&mut line) => {
191                match read_result {
192                    Ok(0) => {
193                        // Client closed connection
194                        #[cfg(feature = "tracing")]
195                        tracing::info!("Client disconnected gracefully");
196                        break;
197                    }
198                    Ok(_) => {
199                        #[cfg(feature = "tracing")]
200                        tracing::debug!("Received request: {}", line.trim());
201
202                        // Parse request
203                        let request: Request = match serde_json::from_str(line.trim()) {
204                            Ok(req) => req,
205                            Err(e) => {
206                                #[cfg(feature = "tracing")]
207                                tracing::warn!("Failed to parse request: {}", e);
208
209                                // Send error response (use ID 0 if we can't parse the request)
210                                let error_response =
211                                    Response::error(0, "parse_error", format!("Invalid JSON: {}", e));
212                                if let Err(_e) = send_response(&mut stream, &error_response).await {
213                                    #[cfg(feature = "tracing")]
214                                    tracing::error!("Failed to send error response: {}", _e);
215                                    break;
216                                }
217                                continue;
218                            }
219                        };
220
221                        // Dispatch request to appropriate handler
222                        let response = handle_request(&db, &config, &mut conn_state, request).await;
223
224                        // Send response
225                        if let Err(_e) = send_response(&mut stream, &response).await {
226                            #[cfg(feature = "tracing")]
227                            tracing::error!("Failed to send response: {}", _e);
228                            break;
229                        }
230                    }
231                    Err(_e) => {
232                        #[cfg(feature = "tracing")]
233                        tracing::error!("Error reading from stream: {}", _e);
234                        break;
235                    }
236                }
237            }
238
239            // Handle outgoing events from subscriptions
240            Some(event) = event_rx.recv() => {
241                if let Err(_e) = send_event(&mut stream, &event).await {
242                    #[cfg(feature = "tracing")]
243                    tracing::error!("Failed to send event: {}", _e);
244                    break;
245                }
246            }
247        }
248    }
249
250    // Cleanup: cancel all active subscriptions
251    conn_state.cancel_all_subscriptions().await;
252
253    #[cfg(feature = "tracing")]
254    tracing::info!("Connection handler terminating");
255
256    Ok(())
257}
258
259/// Sends an event to the client
260///
261/// Serializes the event to JSON and writes it to the stream with a newline.
262///
263/// # Arguments
264/// * `stream` - The connection stream
265/// * `event` - The event to send
266///
267/// # Errors
268/// Returns error if serialization or write fails
269#[cfg(feature = "std")]
270async fn send_event(stream: &mut BufReader<UnixStream>, event: &Event) -> DbResult<()> {
271    // Wrap event in protocol envelope
272    let event_msg = json!({ "event": event });
273
274    let event_json = serde_json::to_string(&event_msg).map_err(|e| DbError::JsonWithContext {
275        context: "Failed to serialize event".to_string(),
276        source: e,
277    })?;
278
279    stream
280        .get_mut()
281        .write_all(event_json.as_bytes())
282        .await
283        .map_err(|e| DbError::IoWithContext {
284            context: "Failed to write event".to_string(),
285            source: e,
286        })?;
287
288    stream
289        .get_mut()
290        .write_all(b"\n")
291        .await
292        .map_err(|e| DbError::IoWithContext {
293            context: "Failed to write event newline".to_string(),
294            source: e,
295        })?;
296
297    #[cfg(feature = "tracing")]
298    tracing::trace!("Sent event for subscription: {}", event.subscription_id);
299
300    Ok(())
301}
302
303/// Sends a response to the client
304///
305/// Serializes the response to JSON and writes it to the stream with a newline.
306///
307/// # Arguments
308/// * `stream` - The connection stream
309/// * `response` - The response to send
310///
311/// # Errors
312/// Returns error if serialization or write fails
313#[cfg(feature = "std")]
314async fn send_response(stream: &mut BufReader<UnixStream>, response: &Response) -> DbResult<()> {
315    let response_json = serde_json::to_string(response).map_err(|e| DbError::JsonWithContext {
316        context: "Failed to serialize response".to_string(),
317        source: e,
318    })?;
319
320    stream
321        .get_mut()
322        .write_all(response_json.as_bytes())
323        .await
324        .map_err(|e| DbError::IoWithContext {
325            context: "Failed to write response".to_string(),
326            source: e,
327        })?;
328
329    stream
330        .get_mut()
331        .write_all(b"\n")
332        .await
333        .map_err(|e| DbError::IoWithContext {
334            context: "Failed to write response newline".to_string(),
335            source: e,
336        })?;
337
338    #[cfg(feature = "tracing")]
339    tracing::debug!("Sent response");
340
341    Ok(())
342}
343
344/// Performs the AimX protocol handshake
345///
346/// Handshake flow:
347/// 1. Client sends HelloMessage with protocol version
348/// 2. Server validates version compatibility
349/// 3. Server sends WelcomeMessage with accepted version
350/// 4. Optional: Authenticate with token
351///
352/// # Arguments
353/// * `stream` - Unix domain socket stream
354/// * `config` - Remote access configuration
355/// * `db` - Database instance (for querying writable records)
356///
357/// # Returns
358/// `BufReader<UnixStream>` if handshake succeeds
359///
360/// # Errors
361/// Returns error if:
362/// - Protocol version incompatible
363/// - Authentication fails
364/// - IO error during handshake
365#[cfg(feature = "std")]
366async fn perform_handshake<R>(
367    stream: UnixStream,
368    config: &AimxConfig,
369    db: &Arc<AimDb<R>>,
370) -> DbResult<BufReader<UnixStream>>
371where
372    R: crate::RuntimeAdapter + crate::Spawn + 'static,
373{
374    let (reader, mut writer) = stream.into_split();
375    let mut reader = BufReader::new(reader);
376
377    // Read Hello message from client
378    let mut line = String::new();
379    reader
380        .read_line(&mut line)
381        .await
382        .map_err(|e| DbError::IoWithContext {
383            context: "Failed to read Hello message".to_string(),
384            source: e,
385        })?;
386
387    #[cfg(feature = "tracing")]
388    tracing::debug!("Received handshake: {}", line.trim());
389
390    // Parse Hello message
391    let hello: HelloMessage =
392        serde_json::from_str(line.trim()).map_err(|e| DbError::JsonWithContext {
393            context: "Failed to parse Hello message".to_string(),
394            source: e,
395        })?;
396
397    #[cfg(feature = "tracing")]
398    tracing::debug!(
399        "Client hello: version={}, client={}",
400        hello.version,
401        hello.client
402    );
403
404    // Version validation: accept "1.0" or "1"
405    if hello.version != "1.0" && hello.version != "1" {
406        let error_msg = format!(
407            r#"{{"error":"unsupported_version","message":"Server supports version 1.0, client requested {}"}}"#,
408            hello.version
409        );
410
411        #[cfg(feature = "tracing")]
412        tracing::warn!("Unsupported version: {}", hello.version);
413
414        let _ = writer.write_all(error_msg.as_bytes()).await;
415        let _ = writer.write_all(b"\n").await;
416        let _ = writer.shutdown().await;
417
418        return Err(DbError::InvalidOperation {
419            operation: "handshake".to_string(),
420            reason: format!("Unsupported version: {}", hello.version),
421        });
422    }
423
424    // Check authentication if required
425    let authenticated = if let Some(expected_token) = &config.auth_token {
426        match &hello.auth_token {
427            Some(provided_token) if provided_token == expected_token => {
428                #[cfg(feature = "tracing")]
429                tracing::debug!("Authentication successful");
430                true
431            }
432            Some(_) => {
433                let error_msg =
434                    r#"{"error":"authentication_failed","message":"Invalid auth token"}"#;
435
436                #[cfg(feature = "tracing")]
437                tracing::warn!("Authentication failed: invalid token");
438
439                let _ = writer.write_all(error_msg.as_bytes()).await;
440                let _ = writer.write_all(b"\n").await;
441                let _ = writer.shutdown().await;
442
443                return Err(DbError::PermissionDenied {
444                    operation: "authentication".to_string(),
445                });
446            }
447            None => {
448                let error_msg =
449                    r#"{"error":"authentication_required","message":"Auth token required"}"#;
450
451                #[cfg(feature = "tracing")]
452                tracing::warn!("Authentication failed: no token provided");
453
454                let _ = writer.write_all(error_msg.as_bytes()).await;
455                let _ = writer.write_all(b"\n").await;
456                let _ = writer.shutdown().await;
457
458                return Err(DbError::PermissionDenied {
459                    operation: "authentication".to_string(),
460                });
461            }
462        }
463    } else {
464        false
465    };
466
467    // Determine permissions based on security policy
468    let permissions = match &config.security_policy {
469        crate::remote::SecurityPolicy::ReadOnly => vec!["read".to_string()],
470        crate::remote::SecurityPolicy::ReadWrite { .. } => {
471            vec!["read".to_string(), "write".to_string()]
472        }
473    };
474
475    // Get writable records by querying database for writable record names
476    let writable_records = match &config.security_policy {
477        crate::remote::SecurityPolicy::ReadOnly => vec![],
478        crate::remote::SecurityPolicy::ReadWrite {
479            writable_records: _writable_type_ids,
480        } => {
481            // Get all records from database
482            let all_records: Vec<RecordMetadata> = db.list_records();
483
484            // Filter to those that are marked writable
485            all_records
486                .into_iter()
487                .filter(|meta| meta.writable)
488                .map(|meta| meta.name)
489                .collect()
490        }
491    };
492
493    // Send Welcome message
494    let welcome = WelcomeMessage {
495        version: "1.0".to_string(),
496        server: "aimdb".to_string(),
497        permissions,
498        writable_records,
499        max_subscriptions: Some(config.subscription_queue_size),
500        authenticated: Some(authenticated),
501    };
502
503    let welcome_json = serde_json::to_string(&welcome).map_err(|e| DbError::JsonWithContext {
504        context: "Failed to serialize Welcome message".to_string(),
505        source: e,
506    })?;
507
508    writer
509        .write_all(welcome_json.as_bytes())
510        .await
511        .map_err(|e| DbError::IoWithContext {
512            context: "Failed to write Welcome message".to_string(),
513            source: e,
514        })?;
515
516    writer
517        .write_all(b"\n")
518        .await
519        .map_err(|e| DbError::IoWithContext {
520            context: "Failed to write Welcome newline".to_string(),
521            source: e,
522        })?;
523
524    #[cfg(feature = "tracing")]
525    tracing::info!("Sent Welcome message to client");
526
527    // Reunite the stream
528    let stream = reader
529        .into_inner()
530        .reunite(writer)
531        .map_err(|e| DbError::Io {
532            source: std::io::Error::other(e.to_string()),
533        })?;
534
535    Ok(BufReader::new(stream))
536}
537
538/// Handles a single request and returns a response
539///
540/// Dispatches to the appropriate handler based on the request method.
541///
542/// # Arguments
543/// * `db` - Database instance
544/// * `config` - Remote access configuration
545/// * `conn_state` - Connection state (for subscription management)
546/// * `request` - The parsed request
547///
548/// # Returns
549/// Response to send to the client
550#[cfg(feature = "std")]
551async fn handle_request<R>(
552    db: &Arc<AimDb<R>>,
553    config: &AimxConfig,
554    conn_state: &mut ConnectionState,
555    request: Request,
556) -> Response
557where
558    R: crate::RuntimeAdapter + crate::Spawn + 'static,
559{
560    #[cfg(feature = "tracing")]
561    tracing::debug!(
562        "Handling request: method={}, id={}",
563        request.method,
564        request.id
565    );
566
567    match request.method.as_str() {
568        "record.list" => handle_record_list(db, config, request.id).await,
569        "record.get" => handle_record_get(db, config, request.id, request.params).await,
570        "record.set" => handle_record_set(db, config, request.id, request.params).await,
571        "record.subscribe" => {
572            handle_record_subscribe(db, config, conn_state, request.id, request.params).await
573        }
574        "record.unsubscribe" => {
575            handle_record_unsubscribe(conn_state, request.id, request.params).await
576        }
577        "record.drain" => handle_record_drain(db, conn_state, request.id, request.params).await,
578        "record.query" => handle_record_query(db, request.id, request.params).await,
579        "graph.nodes" => handle_graph_nodes(db, request.id).await,
580        "graph.edges" => handle_graph_edges(db, request.id).await,
581        "graph.topo_order" => handle_graph_topo_order(db, request.id).await,
582        #[cfg(feature = "profiling")]
583        "profiling.reset" => handle_profiling_reset(db, config, request.id).await,
584        #[cfg(feature = "metrics")]
585        "buffer_metrics.reset" => handle_buffer_metrics_reset(db, config, request.id).await,
586        _ => {
587            #[cfg(feature = "tracing")]
588            tracing::warn!("Unknown method: {}", request.method);
589
590            Response::error(
591                request.id,
592                "method_not_found",
593                format!("Unknown method: {}", request.method),
594            )
595        }
596    }
597}
598
599/// Handles record.list method
600///
601/// Returns metadata for all registered records in the database.
602///
603/// # Arguments
604/// * `db` - Database instance
605/// * `config` - Remote access configuration (for permission checks)
606/// * `request_id` - Request ID for the response
607///
608/// # Returns
609/// Success response with array of RecordMetadata
610#[cfg(feature = "std")]
611async fn handle_record_list<R>(
612    db: &Arc<AimDb<R>>,
613    _config: &AimxConfig,
614    request_id: u64,
615) -> Response
616where
617    R: crate::RuntimeAdapter + crate::Spawn + 'static,
618{
619    #[cfg(feature = "tracing")]
620    tracing::debug!("Listing records");
621
622    // Get all record metadata from database
623    let records: Vec<RecordMetadata> = db.list_records();
624
625    #[cfg(feature = "tracing")]
626    tracing::debug!("Found {} records", records.len());
627
628    // Convert to JSON and return
629    Response::success(request_id, json!(records))
630}
631
632/// Handles profiling.reset method
633///
634/// Clears stage profiling counters for every record. Requires write permission.
635#[cfg(all(feature = "std", feature = "profiling"))]
636async fn handle_profiling_reset<R>(
637    db: &Arc<AimDb<R>>,
638    config: &AimxConfig,
639    request_id: u64,
640) -> Response
641where
642    R: crate::RuntimeAdapter + crate::Spawn + 'static,
643{
644    if matches!(
645        config.security_policy,
646        crate::remote::SecurityPolicy::ReadOnly
647    ) {
648        return Response::error(
649            request_id,
650            "permission_denied",
651            "profiling.reset requires write permission (ReadOnly security policy)".to_string(),
652        );
653    }
654
655    db.reset_stage_profiling();
656
657    #[cfg(feature = "tracing")]
658    tracing::info!("Stage profiling counters reset");
659
660    Response::success(request_id, json!({ "reset": true }))
661}
662
663/// Handles buffer_metrics.reset method
664///
665/// Clears buffer introspection counters for every record. Requires write permission.
666#[cfg(all(feature = "std", feature = "metrics"))]
667async fn handle_buffer_metrics_reset<R>(
668    db: &Arc<AimDb<R>>,
669    config: &AimxConfig,
670    request_id: u64,
671) -> Response
672where
673    R: crate::RuntimeAdapter + crate::Spawn + 'static,
674{
675    if matches!(
676        config.security_policy,
677        crate::remote::SecurityPolicy::ReadOnly
678    ) {
679        return Response::error(
680            request_id,
681            "permission_denied",
682            "buffer_metrics.reset requires write permission (ReadOnly security policy)".to_string(),
683        );
684    }
685
686    db.reset_buffer_metrics();
687
688    #[cfg(feature = "tracing")]
689    tracing::info!("Buffer metrics counters reset");
690
691    Response::success(request_id, json!({ "reset": true }))
692}
693
694/// Handles record.get method
695///
696/// Returns the current value of a record as JSON.
697///
698/// # Arguments
699/// * `db` - Database instance
700/// * `config` - Remote access configuration (for permission checks)
701/// * `request_id` - Request ID for the response
702/// * `params` - Request parameters (must contain "record" field with record name)
703///
704/// # Returns
705/// Success response with record value as JSON, or error if:
706/// - Missing/invalid "record" parameter
707/// - Record not found
708/// - Record not configured with `.with_remote_access()`
709/// - No value available in atomic snapshot
710#[cfg(feature = "std")]
711async fn handle_record_get<R>(
712    db: &Arc<AimDb<R>>,
713    _config: &AimxConfig,
714    request_id: u64,
715    params: Option<serde_json::Value>,
716) -> Response
717where
718    R: crate::RuntimeAdapter + crate::Spawn + 'static,
719{
720    // Extract record name from params
721    let record_name = match params {
722        Some(serde_json::Value::Object(map)) => match map.get("record") {
723            Some(serde_json::Value::String(name)) => name.clone(),
724            _ => {
725                #[cfg(feature = "tracing")]
726                tracing::warn!("Missing or invalid 'record' parameter");
727
728                return Response::error(
729                    request_id,
730                    "invalid_params",
731                    "Missing or invalid 'record' parameter".to_string(),
732                );
733            }
734        },
735        _ => {
736            #[cfg(feature = "tracing")]
737            tracing::warn!("Missing params object");
738
739            return Response::error(
740                request_id,
741                "invalid_params",
742                "Missing params object".to_string(),
743            );
744        }
745    };
746
747    #[cfg(feature = "tracing")]
748    tracing::debug!("Getting value for record: {}", record_name);
749
750    // Try to peek the record's JSON value
751    match db.try_latest_as_json(&record_name) {
752        Some(value) => {
753            #[cfg(feature = "tracing")]
754            tracing::debug!("Successfully retrieved value for {}", record_name);
755
756            Response::success(request_id, value)
757        }
758        None => {
759            #[cfg(feature = "tracing")]
760            tracing::warn!("No value available for record: {}", record_name);
761
762            Response::error(
763                request_id,
764                "not_found",
765                format!("No value available for record: {}", record_name),
766            )
767        }
768    }
769}
770
771/// Handles record.set method
772///
773/// Sets a record value from JSON (write operation).
774///
775/// **SAFETY:** Enforces the "No Producer Override" rule:
776/// - Only allows writes to configuration records (producer_count == 0)
777/// - Prevents remote access from interfering with application logic
778///
779/// # Arguments
780/// * `db` - Database instance
781/// * `config` - Remote access configuration (for permission checks)
782/// * `request_id` - Request ID for the response
783/// * `params` - Request parameters (must contain "name" and "value" fields)
784///
785/// # Returns
786/// Success response, or error if:
787/// - Missing/invalid parameters
788/// - Record not found
789/// - Permission denied (not writable or has active producers)
790/// - Deserialization failed
791#[cfg(feature = "std")]
792async fn handle_record_set<R>(
793    db: &Arc<AimDb<R>>,
794    config: &AimxConfig,
795    request_id: u64,
796    params: Option<serde_json::Value>,
797) -> Response
798where
799    R: crate::RuntimeAdapter + crate::Spawn + 'static,
800{
801    use crate::remote::SecurityPolicy;
802
803    // Check if write operations are allowed
804    let writable_records = match &config.security_policy {
805        SecurityPolicy::ReadOnly => {
806            #[cfg(feature = "tracing")]
807            tracing::warn!("record.set called but security policy is ReadOnly");
808
809            return Response::error(
810                request_id,
811                "permission_denied",
812                "Write operations not allowed (ReadOnly security policy)".to_string(),
813            );
814        }
815        SecurityPolicy::ReadWrite { writable_records } => writable_records,
816    };
817
818    // Extract record name and value from params
819    let (record_name, value) = match params {
820        Some(serde_json::Value::Object(ref map)) => {
821            let name = match map.get("name") {
822                Some(serde_json::Value::String(n)) => n.clone(),
823                _ => {
824                    #[cfg(feature = "tracing")]
825                    tracing::warn!("Missing or invalid 'name' parameter in record.set");
826
827                    return Response::error(
828                        request_id,
829                        "invalid_params",
830                        "Missing or invalid 'name' parameter (expected string)".to_string(),
831                    );
832                }
833            };
834
835            let val = match map.get("value") {
836                Some(v) => v.clone(),
837                None => {
838                    #[cfg(feature = "tracing")]
839                    tracing::warn!("Missing 'value' parameter in record.set");
840
841                    return Response::error(
842                        request_id,
843                        "invalid_params",
844                        "Missing 'value' parameter".to_string(),
845                    );
846                }
847            };
848
849            (name, val)
850        }
851        _ => {
852            #[cfg(feature = "tracing")]
853            tracing::warn!("Missing params object in record.set");
854
855            return Response::error(
856                request_id,
857                "invalid_params",
858                "Missing params object".to_string(),
859            );
860        }
861    };
862
863    #[cfg(feature = "tracing")]
864    tracing::debug!("Setting value for record: {}", record_name);
865
866    // Check if record is in the writable_records set (using record key)
867    if !writable_records.contains(&record_name) {
868        #[cfg(feature = "tracing")]
869        tracing::warn!("Record '{}' not in writable_records set", record_name);
870
871        return Response::error(
872            request_id,
873            "permission_denied",
874            format!(
875                "Record '{}' is not writable. \
876                 Configure with .with_writable_record() to allow writes.",
877                record_name
878            ),
879        );
880    }
881
882    // Attempt to set the value
883    // This will enforce the "no producer override" rule internally
884    match db.set_record_from_json(&record_name, value) {
885        Ok(()) => {
886            #[cfg(feature = "tracing")]
887            tracing::info!("Successfully set value for record: {}", record_name);
888
889            // Get the updated value to return in response
890            let result = if let Some(updated_value) = db.try_latest_as_json(&record_name) {
891                serde_json::json!({
892                    "status": "success",
893                    "value": updated_value,
894                })
895            } else {
896                serde_json::json!({
897                    "status": "success",
898                })
899            };
900
901            Response::success(request_id, result)
902        }
903        Err(e) => {
904            #[cfg(feature = "tracing")]
905            tracing::error!("Failed to set value for record '{}': {}", record_name, e);
906
907            // Map internal errors to appropriate response codes
908            let (code, message) = match e {
909                crate::DbError::RecordKeyNotFound { key } => {
910                    ("not_found", format!("Record '{}' not found", key))
911                }
912                crate::DbError::PermissionDenied { operation } => {
913                    // This is the "has active producers" error
914                    ("permission_denied", operation)
915                }
916                crate::DbError::JsonWithContext { context, .. } => (
917                    "validation_error",
918                    format!("JSON validation failed: {}", context),
919                ),
920                crate::DbError::RuntimeError { message } => ("internal_error", message),
921                _ => ("internal_error", format!("Failed to set value: {}", e)),
922            };
923
924            Response::error(request_id, code, message)
925        }
926    }
927}
928
929/// Handles record.subscribe method
930///
931/// Subscribes to live updates for a record.
932///
933/// # Arguments
934/// * `db` - Database instance
935/// * `config` - Remote access configuration
936/// * `conn_state` - Connection state (for subscription tracking)
937/// * `request_id` - Request ID for the response
938/// * `params` - Request parameters (must contain "name" field with record name)
939///
940/// # Returns
941/// Success response with subscription_id and queue_size, or error if:
942/// - Missing/invalid parameters
943/// - Record not found
944/// - Too many subscriptions
945#[cfg(feature = "std")]
946async fn handle_record_subscribe<R>(
947    db: &Arc<AimDb<R>>,
948    config: &AimxConfig,
949    conn_state: &mut ConnectionState,
950    request_id: u64,
951    params: Option<serde_json::Value>,
952) -> Response
953where
954    R: crate::RuntimeAdapter + crate::Spawn + 'static,
955{
956    // Extract record name from params
957    let record_name = match params {
958        Some(serde_json::Value::Object(ref map)) => match map.get("name") {
959            Some(serde_json::Value::String(name)) => name.clone(),
960            _ => {
961                #[cfg(feature = "tracing")]
962                tracing::warn!("Missing or invalid 'name' parameter in record.subscribe");
963
964                return Response::error(
965                    request_id,
966                    "invalid_params",
967                    "Missing or invalid 'name' parameter (expected string)".to_string(),
968                );
969            }
970        },
971        _ => {
972            #[cfg(feature = "tracing")]
973            tracing::warn!("Missing params object in record.subscribe");
974
975            return Response::error(
976                request_id,
977                "invalid_params",
978                "Missing params object".to_string(),
979            );
980        }
981    };
982
983    // Optional: send_initial flag (default true)
984    let _send_initial = params
985        .as_ref()
986        .and_then(|p| p.as_object())
987        .and_then(|map| map.get("send_initial"))
988        .and_then(|v| v.as_bool())
989        .unwrap_or(true);
990
991    #[cfg(feature = "tracing")]
992    tracing::debug!("Subscribing to record: {}", record_name);
993
994    // Check max subscriptions limit
995    if conn_state.subscriptions.len() >= config.subscription_queue_size {
996        #[cfg(feature = "tracing")]
997        tracing::warn!(
998            "Too many subscriptions: {} (max: {})",
999            conn_state.subscriptions.len(),
1000            config.subscription_queue_size
1001        );
1002
1003        return Response::error(
1004            request_id,
1005            "too_many_subscriptions",
1006            format!(
1007                "Maximum subscriptions reached: {}",
1008                config.subscription_queue_size
1009            ),
1010        );
1011    }
1012
1013    // Generate unique subscription ID
1014    let subscription_id = conn_state.generate_subscription_id();
1015
1016    // Subscribe to record updates via the database API (using record key)
1017    let (value_rx, cancel_tx) =
1018        match db.subscribe_record_updates(&record_name, config.subscription_queue_size) {
1019            Ok(channels) => channels,
1020            Err(e) => {
1021                // Map internal errors to appropriate response codes
1022                let (code, message) = match &e {
1023                    crate::DbError::RecordKeyNotFound { key } => {
1024                        #[cfg(feature = "tracing")]
1025                        tracing::warn!("Record not found: {}", key);
1026                        ("not_found", format!("Record '{}' not found", key))
1027                    }
1028                    _ => {
1029                        #[cfg(feature = "tracing")]
1030                        tracing::error!("Failed to subscribe to record updates: {}", e);
1031                        ("internal_error", format!("Failed to subscribe: {}", e))
1032                    }
1033                };
1034
1035                return Response::error(request_id, code, message);
1036            }
1037        };
1038
1039    // Spawn event streaming task for this subscription
1040    let event_tx = conn_state.event_tx.clone();
1041    let sub_id_clone = subscription_id.clone();
1042    let stream_handle = tokio::spawn(async move {
1043        stream_subscription_events(sub_id_clone, value_rx, event_tx).await;
1044    });
1045
1046    // Store subscription handle
1047    let handle = SubscriptionHandle {
1048        subscription_id: subscription_id.clone(),
1049        record_name: record_name.clone(),
1050        cancel_tx,
1051    };
1052    conn_state.add_subscription(handle);
1053
1054    // Detach the streaming task (it will run until cancelled or channel closes)
1055    std::mem::drop(stream_handle);
1056
1057    #[cfg(feature = "tracing")]
1058    tracing::info!(
1059        "Created subscription {} for record {}",
1060        subscription_id,
1061        record_name
1062    );
1063
1064    // Return success response
1065    Response::success(
1066        request_id,
1067        json!({
1068            "subscription_id": subscription_id,
1069            "queue_size": config.subscription_queue_size,
1070        }),
1071    )
1072}
1073
1074/// Streams subscription events from value channel to event channel
1075///
1076/// Reads JSON values from the subscription's receiver and converts them
1077/// into Event messages with sequence numbers and timestamps.
1078///
1079/// # Arguments
1080/// * `subscription_id` - Unique subscription identifier
1081/// * `value_rx` - Receiver for JSON values from the database
1082/// * `event_tx` - Sender for Event messages to the client
1083#[cfg(feature = "std")]
1084async fn stream_subscription_events(
1085    subscription_id: String,
1086    mut value_rx: tokio::sync::mpsc::Receiver<serde_json::Value>,
1087    event_tx: tokio::sync::mpsc::UnboundedSender<Event>,
1088) {
1089    let mut sequence: u64 = 1;
1090
1091    #[cfg(feature = "tracing")]
1092    tracing::debug!(
1093        "Event streaming task started for subscription: {}",
1094        subscription_id
1095    );
1096
1097    while let Some(json_value) = value_rx.recv().await {
1098        // Generate timestamp in "secs.nanosecs" format
1099        let duration = std::time::SystemTime::now()
1100            .duration_since(std::time::UNIX_EPOCH)
1101            .unwrap_or_default();
1102        let timestamp = format!("{}.{:09}", duration.as_secs(), duration.subsec_nanos());
1103
1104        // Create event
1105        let event = Event {
1106            subscription_id: subscription_id.clone(),
1107            sequence,
1108            data: json_value,
1109            timestamp,
1110            dropped: None, // TODO: Implement dropped event tracking
1111        };
1112
1113        // Send event to the funnel
1114        if event_tx.send(event).is_err() {
1115            #[cfg(feature = "tracing")]
1116            tracing::debug!(
1117                "Event channel closed, terminating stream for subscription: {}",
1118                subscription_id
1119            );
1120            break;
1121        }
1122
1123        sequence += 1;
1124    }
1125
1126    #[cfg(feature = "tracing")]
1127    tracing::debug!(
1128        "Event streaming task terminated for subscription: {}",
1129        subscription_id
1130    );
1131}
1132
1133/// Handles record.unsubscribe method
1134///
1135/// Cancels an active subscription.
1136///
1137/// # Arguments
1138/// * `conn_state` - Connection state (for subscription tracking)
1139/// * `request_id` - Request ID for the response
1140/// * `params` - Request parameters (must contain "subscription_id" field)
1141///
1142/// # Returns
1143/// Success response, or error if subscription not found
1144#[cfg(feature = "std")]
1145async fn handle_record_unsubscribe(
1146    conn_state: &mut ConnectionState,
1147    request_id: u64,
1148    params: Option<serde_json::Value>,
1149) -> Response {
1150    // Parse subscription_id parameter
1151    let subscription_id = match params {
1152        Some(serde_json::Value::Object(ref map)) => match map.get("subscription_id") {
1153            Some(serde_json::Value::String(id)) => id.clone(),
1154            _ => {
1155                return Response::error(
1156                    request_id,
1157                    "invalid_params",
1158                    "Missing or invalid 'subscription_id' parameter".to_string(),
1159                )
1160            }
1161        },
1162        _ => {
1163            return Response::error(
1164                request_id,
1165                "invalid_params",
1166                "Missing 'subscription_id' parameter".to_string(),
1167            )
1168        }
1169    };
1170
1171    #[cfg(feature = "tracing")]
1172    tracing::debug!("Unsubscribing from subscription_id: {}", subscription_id);
1173
1174    // Look up and remove the subscription
1175    match conn_state.subscriptions.remove(&subscription_id) {
1176        Some(handle) => {
1177            // Send cancellation signal to the streaming task
1178            // It's okay if this fails (task may have already terminated)
1179            let _ = handle.cancel_tx.send(());
1180
1181            #[cfg(feature = "tracing")]
1182            tracing::debug!(
1183                "Cancelled subscription {} for record {}",
1184                subscription_id,
1185                handle.record_name
1186            );
1187
1188            Response::success(
1189                request_id,
1190                serde_json::json!({
1191                    "subscription_id": subscription_id,
1192                    "status": "cancelled"
1193                }),
1194            )
1195        }
1196        None => {
1197            #[cfg(feature = "tracing")]
1198            tracing::warn!("Subscription not found: {}", subscription_id);
1199
1200            Response::error(
1201                request_id,
1202                "not_found",
1203                format!("Subscription '{}' not found", subscription_id),
1204            )
1205        }
1206    }
1207}
1208
1209/// Handles record.drain method
1210///
1211/// Drains all pending values from a record's drain reader. On the first call for
1212/// a given record, creates a dedicated drain reader (returns empty). Subsequent
1213/// calls return all values accumulated since the previous drain.
1214///
1215/// # Arguments
1216/// * `db` - Database instance
1217/// * `conn_state` - Connection state (for drain reader management)
1218/// * `request_id` - Request ID for the response
1219/// * `params` - Request parameters (must contain "name" field, optional "limit")
1220///
1221/// # Returns
1222/// Success response with `record_name`, `values` array, and `count`, or error if:
1223/// - Missing/invalid parameters
1224/// - Record not found
1225/// - Record not configured with `.with_remote_access()`
1226#[cfg(feature = "std")]
1227async fn handle_record_drain<R>(
1228    db: &Arc<AimDb<R>>,
1229    conn_state: &mut ConnectionState,
1230    request_id: u64,
1231    params: Option<serde_json::Value>,
1232) -> Response
1233where
1234    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1235{
1236    // Extract record name from params
1237    let record_name = match params {
1238        Some(serde_json::Value::Object(ref map)) => match map.get("name") {
1239            Some(serde_json::Value::String(name)) => name.clone(),
1240            _ => {
1241                return Response::error(
1242                    request_id,
1243                    "invalid_params",
1244                    "Missing or invalid 'name' parameter (expected string)".to_string(),
1245                );
1246            }
1247        },
1248        _ => {
1249            return Response::error(
1250                request_id,
1251                "invalid_params",
1252                "Missing params object".to_string(),
1253            );
1254        }
1255    };
1256
1257    // Optional: limit parameter
1258    // Use try_from instead of `as` to avoid silent truncation on 32-bit targets
1259    // (values that don't fit in usize are treated as "no limit").
1260    let limit = params
1261        .as_ref()
1262        .and_then(|p| p.as_object())
1263        .and_then(|map| map.get("limit"))
1264        .and_then(|v| v.as_u64())
1265        .map(|v| usize::try_from(v).unwrap_or(usize::MAX))
1266        .unwrap_or(usize::MAX);
1267
1268    #[cfg(feature = "tracing")]
1269    tracing::debug!(
1270        "Draining record: {} (limit: {})",
1271        record_name,
1272        if limit == usize::MAX {
1273            "all".to_string()
1274        } else {
1275            limit.to_string()
1276        }
1277    );
1278
1279    // Lazily create drain reader on first call for this record
1280    if !conn_state.drain_readers.contains_key(&record_name) {
1281        // Resolve record key → RecordId → AnyRecord → subscribe_json()
1282        let id = match db.inner().resolve_str(&record_name) {
1283            Some(id) => id,
1284            None => {
1285                return Response::error(
1286                    request_id,
1287                    "not_found",
1288                    format!("Record '{}' not found", record_name),
1289                );
1290            }
1291        };
1292
1293        let record = match db.inner().storage(id) {
1294            Some(r) => r,
1295            None => {
1296                return Response::error(
1297                    request_id,
1298                    "not_found",
1299                    format!("Record '{}' storage not found", record_name),
1300                );
1301            }
1302        };
1303
1304        let reader = match record.subscribe_json() {
1305            Ok(r) => r,
1306            Err(e) => {
1307                return Response::error(
1308                    request_id,
1309                    "remote_access_not_enabled",
1310                    format!(
1311                        "Record '{}' not configured with .with_remote_access(): {}",
1312                        record_name, e
1313                    ),
1314                );
1315            }
1316        };
1317
1318        conn_state.drain_readers.insert(record_name.clone(), reader);
1319    }
1320
1321    // Drain all pending values from the reader
1322    let reader = conn_state.drain_readers.get_mut(&record_name).unwrap();
1323    let mut values = Vec::new();
1324
1325    loop {
1326        if values.len() >= limit {
1327            break;
1328        }
1329        match reader.try_recv_json() {
1330            Ok(val) => values.push(val),
1331            Err(DbError::BufferEmpty) => break,
1332            Err(DbError::BufferLagged { .. }) => {
1333                // Ring overflowed since last drain — cursor resets.
1334                // Log warning, keep draining.
1335                #[cfg(feature = "tracing")]
1336                tracing::warn!(
1337                    "Drain reader lagged for record '{}' — some values were lost",
1338                    record_name
1339                );
1340                continue;
1341            }
1342            Err(_) => break,
1343        }
1344    }
1345
1346    let count = values.len();
1347
1348    #[cfg(feature = "tracing")]
1349    tracing::debug!("Drained {} values from record '{}'", count, record_name);
1350
1351    Response::success(
1352        request_id,
1353        json!({
1354            "record_name": record_name,
1355            "values": values,
1356            "count": count,
1357        }),
1358    )
1359}
1360
1361// ============================================================================
1362// Persistence Query (record.query)
1363// ============================================================================
1364
1365/// Type-erased query handler registered by `aimdb-persistence` via Extensions.
1366///
1367/// This keeps `aimdb-core` free of persistence-specific imports. The handler is
1368/// a boxed async function that accepts query parameters (record pattern, limit,
1369/// start/end timestamps) and returns a JSON value with the results.
1370///
1371/// Registered by `aimdb_persistence` via the `with_persistence()` builder extension.
1372pub type QueryHandlerFn = Box<
1373    dyn Fn(
1374            QueryHandlerParams,
1375        ) -> core::pin::Pin<
1376            Box<dyn core::future::Future<Output = Result<serde_json::Value, String>> + Send>,
1377        > + Send
1378        + Sync,
1379>;
1380
1381/// Parameters for the type-erased query handler.
1382#[derive(Debug, Clone)]
1383pub struct QueryHandlerParams {
1384    /// Record pattern (supports `*` wildcard).
1385    pub name: String,
1386    /// Maximum results per matching record.
1387    pub limit: Option<usize>,
1388    /// Optional start timestamp (Unix ms).
1389    pub start: Option<u64>,
1390    /// Optional end timestamp (Unix ms).
1391    pub end: Option<u64>,
1392}
1393
1394/// Handles `record.query` method.
1395///
1396/// Delegates to a [`QueryHandlerFn`] stored in the database's `Extensions`
1397/// TypeMap. If no handler is registered (i.e. persistence is not configured),
1398/// returns an error.
1399#[cfg(feature = "std")]
1400async fn handle_record_query<R>(
1401    db: &Arc<AimDb<R>>,
1402    request_id: u64,
1403    params: Option<serde_json::Value>,
1404) -> Response
1405where
1406    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1407{
1408    // Extract the query handler from Extensions.
1409    let handler = match db.extensions().get::<QueryHandlerFn>() {
1410        Some(h) => h,
1411        None => {
1412            return Response::error(
1413                request_id,
1414                "not_configured",
1415                "Persistence not configured. Call .with_persistence() on the builder.".to_string(),
1416            );
1417        }
1418    };
1419
1420    // Parse parameters
1421    let (name, limit, start, end) = match &params {
1422        Some(serde_json::Value::Object(map)) => {
1423            let name = map
1424                .get("name")
1425                .and_then(|v| v.as_str())
1426                .unwrap_or("*")
1427                .to_string();
1428            let limit = map
1429                .get("limit")
1430                .and_then(|v| v.as_u64())
1431                .and_then(|v| usize::try_from(v).ok());
1432            let start = map.get("start").and_then(|v| v.as_u64());
1433            let end = map.get("end").and_then(|v| v.as_u64());
1434            (name, limit, start, end)
1435        }
1436        _ => ("*".to_string(), None, None, None),
1437    };
1438
1439    let query_params = QueryHandlerParams {
1440        name,
1441        limit,
1442        start,
1443        end,
1444    };
1445
1446    match handler(query_params).await {
1447        Ok(result) => Response::success(request_id, result),
1448        Err(msg) => Response::error(request_id, "query_error", msg),
1449    }
1450}
1451
1452// ============================================================================
1453// Graph Introspection Methods
1454// ============================================================================
1455
1456/// Handles graph.nodes method
1457///
1458/// Returns all nodes in the dependency graph with their metadata.
1459/// Each node represents a record with its origin, buffer type, and connections.
1460///
1461/// # Arguments
1462/// * `db` - Database instance
1463/// * `request_id` - Request ID for the response
1464///
1465/// # Returns
1466/// Success response with array of GraphNode objects:
1467/// - `key`: Record key (e.g., "temp.vienna")
1468/// - `origin`: How the record gets its values (source, link, transform, passive)
1469/// - `buffer_type`: Buffer type ("spmc_ring", "single_latest", "mailbox", "none")
1470/// - `buffer_capacity`: Optional buffer capacity
1471/// - `tap_count`: Number of taps attached
1472/// - `has_outbound_link`: Whether an outbound connector is configured
1473#[cfg(feature = "std")]
1474async fn handle_graph_nodes<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1475where
1476    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1477{
1478    #[cfg(feature = "tracing")]
1479    tracing::debug!("Getting dependency graph nodes");
1480
1481    let graph = db.inner().dependency_graph();
1482    let nodes = &graph.nodes;
1483
1484    #[cfg(feature = "tracing")]
1485    tracing::debug!("Returning {} graph nodes", nodes.len());
1486
1487    Response::success(request_id, json!(nodes))
1488}
1489
1490/// Handles graph.edges method
1491///
1492/// Returns all edges in the dependency graph representing data flow between records.
1493/// Edges are directed from source to target and include the edge type.
1494///
1495/// # Arguments
1496/// * `db` - Database instance
1497/// * `request_id` - Request ID for the response
1498///
1499/// # Returns
1500/// Success response with array of GraphEdge objects:
1501/// - `from`: Source record key
1502/// - `to`: Target record key
1503/// - `edge_type`: Type of connection (TransformInput, TransformJoinInput, etc.)
1504#[cfg(feature = "std")]
1505async fn handle_graph_edges<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1506where
1507    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1508{
1509    #[cfg(feature = "tracing")]
1510    tracing::debug!("Getting dependency graph edges");
1511
1512    let graph = db.inner().dependency_graph();
1513    let edges = &graph.edges;
1514
1515    #[cfg(feature = "tracing")]
1516    tracing::debug!("Returning {} graph edges", edges.len());
1517
1518    Response::success(request_id, json!(edges))
1519}
1520
1521/// Handles graph.topo_order method
1522///
1523/// Returns the topological ordering of records in the dependency graph.
1524/// This ordering ensures that all dependencies are processed before dependents.
1525/// Used for spawn ordering and understanding data flow.
1526///
1527/// # Arguments
1528/// * `db` - Database instance
1529/// * `request_id` - Request ID for the response
1530///
1531/// # Returns
1532/// Success response with array of record keys in topological order:
1533/// - Sources and passive records first
1534/// - Transform outputs after their inputs
1535/// - Respects the DAG structure for proper initialization order
1536#[cfg(feature = "std")]
1537async fn handle_graph_topo_order<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1538where
1539    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1540{
1541    #[cfg(feature = "tracing")]
1542    tracing::debug!("Getting topological order");
1543
1544    let graph = db.inner().dependency_graph();
1545    let topo_order = graph.topo_order();
1546
1547    #[cfg(feature = "tracing")]
1548    tracing::debug!(
1549        "Returning topological order with {} records",
1550        topo_order.len()
1551    );
1552
1553    Response::success(request_id, json!(topo_order))
1554}