Skip to main content

aimdb_core/remote/
handler.rs

1//! Connection handler for AimX protocol
2//!
3//! Handles individual client connections, including handshake, authentication,
4//! and protocol method dispatch.
5//!
6//! # Architecture: Event Funnel Pattern
7//!
8//! Subscriptions use a funnel pattern for clean event delivery:
9//! - Each subscription spawns a consumer task that reads from the record buffer
10//! - Consumer tasks send events to a shared mpsc channel (the "funnel")
11//! - A single writer task drains the funnel and writes events to the UnixStream
12//! - This ensures NDJSON line integrity and prevents write interleaving
13
14use crate::remote::{
15    AimxConfig, Event, HelloMessage, RecordMetadata, Request, Response, WelcomeMessage,
16};
17use crate::{AimDb, DbError, DbResult};
18
19#[cfg(feature = "std")]
20use std::collections::HashMap;
21#[cfg(feature = "std")]
22use std::sync::Arc;
23
24#[cfg(feature = "std")]
25use serde_json::json;
26#[cfg(feature = "std")]
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
28#[cfg(feature = "std")]
29use tokio::net::UnixStream;
30#[cfg(feature = "std")]
31use tokio::sync::mpsc;
32#[cfg(feature = "std")]
33use tokio::sync::oneshot;
34
35/// Handle for an active subscription
36///
37/// Tracks the state needed to manage a single subscription's lifecycle.
38#[cfg(feature = "std")]
39#[allow(dead_code)] // record_name used only in tracing feature
40struct SubscriptionHandle {
41    /// Unique subscription identifier (returned to client)
42    subscription_id: String,
43
44    /// Record name being subscribed to
45    record_name: String,
46
47    /// Signal to cancel this subscription
48    /// When sent, the consumer task will terminate
49    cancel_tx: oneshot::Sender<()>,
50}
51
52/// Connection state for managing subscriptions
53///
54/// Tracks all active subscriptions for a single client connection.
55#[cfg(feature = "std")]
56struct ConnectionState {
57    /// Active subscriptions by subscription_id
58    subscriptions: HashMap<String, SubscriptionHandle>,
59
60    /// Counter for generating unique subscription IDs
61    next_subscription_id: u64,
62
63    /// Event funnel: all subscription tasks send events here
64    /// This channel feeds the single writer task
65    event_tx: mpsc::UnboundedSender<Event>,
66
67    /// Per-record drain readers, created lazily on first record.drain call.
68    /// One drain reader per record, per connection.
69    drain_readers: HashMap<String, Box<dyn crate::buffer::JsonBufferReader + Send>>,
70}
71
72#[cfg(feature = "std")]
73impl ConnectionState {
74    /// Creates a new connection state
75    fn new(event_tx: mpsc::UnboundedSender<Event>) -> Self {
76        Self {
77            subscriptions: HashMap::new(),
78            next_subscription_id: 1,
79            event_tx,
80            drain_readers: HashMap::new(),
81        }
82    }
83
84    /// Generates a unique subscription ID for this connection
85    fn generate_subscription_id(&mut self) -> String {
86        let id = format!("sub-{}", self.next_subscription_id);
87        self.next_subscription_id += 1;
88        id
89    }
90
91    /// Adds a subscription to the connection state
92    fn add_subscription(&mut self, handle: SubscriptionHandle) {
93        self.subscriptions
94            .insert(handle.subscription_id.clone(), handle);
95    }
96
97    /// Removes and returns a subscription by ID
98    #[allow(dead_code)]
99    fn remove_subscription(&mut self, subscription_id: &str) -> Option<SubscriptionHandle> {
100        self.subscriptions.remove(subscription_id)
101    }
102
103    /// Cancels all active subscriptions
104    ///
105    /// Sends cancel signals to all subscription tasks and clears the map.
106    /// Called when the client disconnects.
107    async fn cancel_all_subscriptions(&mut self) {
108        #[cfg(feature = "tracing")]
109        tracing::info!(
110            "Canceling {} active subscriptions",
111            self.subscriptions.len()
112        );
113
114        for (_id, handle) in self.subscriptions.drain() {
115            // Send cancel signal (ignore if receiver already dropped)
116            let _ = handle.cancel_tx.send(());
117        }
118    }
119}
120
121/// Handles an incoming client connection
122///
123/// Processes the AimX protocol handshake and manages the client session.
124/// Implements the event funnel pattern for subscription event delivery.
125///
126/// # Architecture
127///
128/// ```text
129/// ┌─────────────────┐
130/// │ Subscription 1  │───┐
131/// │ Consumer Task   │   │
132/// └─────────────────┘   │
133///                       ├──► Event Funnel ───► select! loop ───► UnixStream
134/// ┌─────────────────┐   │     (mpsc)          (interleaved    
135/// │ Subscription 2  │───┘                      writes)
136/// │ Consumer Task   │
137/// └─────────────────┘
138/// ```
139///
140/// The main loop uses `tokio::select!` to interleave:
141/// - Reading requests from the stream
142/// - Writing events from subscriptions
143///
144/// This ensures both responses and events are written without blocking.
145///
146/// # Arguments
147/// * `db` - Database instance
148/// * `config` - Remote access configuration
149/// * `stream` - Unix domain socket stream
150///
151/// # Errors
152/// Returns error if handshake fails or stream operations error
153#[cfg(feature = "std")]
154pub async fn handle_connection<R>(
155    db: Arc<AimDb<R>>,
156    config: AimxConfig,
157    stream: UnixStream,
158) -> DbResult<()>
159where
160    R: crate::RuntimeAdapter + crate::Spawn + 'static,
161{
162    #[cfg(feature = "tracing")]
163    tracing::info!("New remote access connection established");
164
165    // Perform protocol handshake
166    let mut stream = match perform_handshake(stream, &config, &db).await {
167        Ok(stream) => stream,
168        Err(e) => {
169            #[cfg(feature = "tracing")]
170            tracing::warn!("Handshake failed: {}", e);
171            return Err(e);
172        }
173    };
174
175    #[cfg(feature = "tracing")]
176    tracing::info!("Handshake complete, client ready");
177
178    // Create event funnel: all subscription tasks will send events here
179    let (event_tx, mut event_rx) = mpsc::unbounded_channel::<Event>();
180
181    // Initialize connection state
182    let mut conn_state = ConnectionState::new(event_tx);
183
184    // Main loop: interleave reading requests and writing events
185    loop {
186        let mut line = String::new();
187
188        tokio::select! {
189            // Handle incoming requests
190            read_result = stream.read_line(&mut line) => {
191                match read_result {
192                    Ok(0) => {
193                        // Client closed connection
194                        #[cfg(feature = "tracing")]
195                        tracing::info!("Client disconnected gracefully");
196                        break;
197                    }
198                    Ok(_) => {
199                        #[cfg(feature = "tracing")]
200                        tracing::debug!("Received request: {}", line.trim());
201
202                        // Parse request
203                        let request: Request = match serde_json::from_str(line.trim()) {
204                            Ok(req) => req,
205                            Err(e) => {
206                                #[cfg(feature = "tracing")]
207                                tracing::warn!("Failed to parse request: {}", e);
208
209                                // Send error response (use ID 0 if we can't parse the request)
210                                let error_response =
211                                    Response::error(0, "parse_error", format!("Invalid JSON: {}", e));
212                                if let Err(_e) = send_response(&mut stream, &error_response).await {
213                                    #[cfg(feature = "tracing")]
214                                    tracing::error!("Failed to send error response: {}", _e);
215                                    break;
216                                }
217                                continue;
218                            }
219                        };
220
221                        // Dispatch request to appropriate handler
222                        let response = handle_request(&db, &config, &mut conn_state, request).await;
223
224                        // Send response
225                        if let Err(_e) = send_response(&mut stream, &response).await {
226                            #[cfg(feature = "tracing")]
227                            tracing::error!("Failed to send response: {}", _e);
228                            break;
229                        }
230                    }
231                    Err(_e) => {
232                        #[cfg(feature = "tracing")]
233                        tracing::error!("Error reading from stream: {}", _e);
234                        break;
235                    }
236                }
237            }
238
239            // Handle outgoing events from subscriptions
240            Some(event) = event_rx.recv() => {
241                if let Err(_e) = send_event(&mut stream, &event).await {
242                    #[cfg(feature = "tracing")]
243                    tracing::error!("Failed to send event: {}", _e);
244                    break;
245                }
246            }
247        }
248    }
249
250    // Cleanup: cancel all active subscriptions
251    conn_state.cancel_all_subscriptions().await;
252
253    #[cfg(feature = "tracing")]
254    tracing::info!("Connection handler terminating");
255
256    Ok(())
257}
258
259/// Sends an event to the client
260///
261/// Serializes the event to JSON and writes it to the stream with a newline.
262///
263/// # Arguments
264/// * `stream` - The connection stream
265/// * `event` - The event to send
266///
267/// # Errors
268/// Returns error if serialization or write fails
269#[cfg(feature = "std")]
270async fn send_event(stream: &mut BufReader<UnixStream>, event: &Event) -> DbResult<()> {
271    // Wrap event in protocol envelope
272    let event_msg = json!({ "event": event });
273
274    let event_json = serde_json::to_string(&event_msg).map_err(|e| DbError::JsonWithContext {
275        context: "Failed to serialize event".to_string(),
276        source: e,
277    })?;
278
279    stream
280        .get_mut()
281        .write_all(event_json.as_bytes())
282        .await
283        .map_err(|e| DbError::IoWithContext {
284            context: "Failed to write event".to_string(),
285            source: e,
286        })?;
287
288    stream
289        .get_mut()
290        .write_all(b"\n")
291        .await
292        .map_err(|e| DbError::IoWithContext {
293            context: "Failed to write event newline".to_string(),
294            source: e,
295        })?;
296
297    #[cfg(feature = "tracing")]
298    tracing::trace!("Sent event for subscription: {}", event.subscription_id);
299
300    Ok(())
301}
302
303/// Sends a response to the client
304///
305/// Serializes the response to JSON and writes it to the stream with a newline.
306///
307/// # Arguments
308/// * `stream` - The connection stream
309/// * `response` - The response to send
310///
311/// # Errors
312/// Returns error if serialization or write fails
313#[cfg(feature = "std")]
314async fn send_response(stream: &mut BufReader<UnixStream>, response: &Response) -> DbResult<()> {
315    let response_json = serde_json::to_string(response).map_err(|e| DbError::JsonWithContext {
316        context: "Failed to serialize response".to_string(),
317        source: e,
318    })?;
319
320    stream
321        .get_mut()
322        .write_all(response_json.as_bytes())
323        .await
324        .map_err(|e| DbError::IoWithContext {
325            context: "Failed to write response".to_string(),
326            source: e,
327        })?;
328
329    stream
330        .get_mut()
331        .write_all(b"\n")
332        .await
333        .map_err(|e| DbError::IoWithContext {
334            context: "Failed to write response newline".to_string(),
335            source: e,
336        })?;
337
338    #[cfg(feature = "tracing")]
339    tracing::debug!("Sent response");
340
341    Ok(())
342}
343
344/// Performs the AimX protocol handshake
345///
346/// Handshake flow:
347/// 1. Client sends HelloMessage with protocol version
348/// 2. Server validates version compatibility
349/// 3. Server sends WelcomeMessage with accepted version
350/// 4. Optional: Authenticate with token
351///
352/// # Arguments
353/// * `stream` - Unix domain socket stream
354/// * `config` - Remote access configuration
355/// * `db` - Database instance (for querying writable records)
356///
357/// # Returns
358/// `BufReader<UnixStream>` if handshake succeeds
359///
360/// # Errors
361/// Returns error if:
362/// - Protocol version incompatible
363/// - Authentication fails
364/// - IO error during handshake
365#[cfg(feature = "std")]
366async fn perform_handshake<R>(
367    stream: UnixStream,
368    config: &AimxConfig,
369    db: &Arc<AimDb<R>>,
370) -> DbResult<BufReader<UnixStream>>
371where
372    R: crate::RuntimeAdapter + crate::Spawn + 'static,
373{
374    let (reader, mut writer) = stream.into_split();
375    let mut reader = BufReader::new(reader);
376
377    // Read Hello message from client
378    let mut line = String::new();
379    reader
380        .read_line(&mut line)
381        .await
382        .map_err(|e| DbError::IoWithContext {
383            context: "Failed to read Hello message".to_string(),
384            source: e,
385        })?;
386
387    #[cfg(feature = "tracing")]
388    tracing::debug!("Received handshake: {}", line.trim());
389
390    // Parse Hello message
391    let hello: HelloMessage =
392        serde_json::from_str(line.trim()).map_err(|e| DbError::JsonWithContext {
393            context: "Failed to parse Hello message".to_string(),
394            source: e,
395        })?;
396
397    #[cfg(feature = "tracing")]
398    tracing::debug!(
399        "Client hello: version={}, client={}",
400        hello.version,
401        hello.client
402    );
403
404    // Version validation: accept "1.0" or "1"
405    if hello.version != "1.0" && hello.version != "1" {
406        let error_msg = format!(
407            r#"{{"error":"unsupported_version","message":"Server supports version 1.0, client requested {}"}}"#,
408            hello.version
409        );
410
411        #[cfg(feature = "tracing")]
412        tracing::warn!("Unsupported version: {}", hello.version);
413
414        let _ = writer.write_all(error_msg.as_bytes()).await;
415        let _ = writer.write_all(b"\n").await;
416        let _ = writer.shutdown().await;
417
418        return Err(DbError::InvalidOperation {
419            operation: "handshake".to_string(),
420            reason: format!("Unsupported version: {}", hello.version),
421        });
422    }
423
424    // Check authentication if required
425    let authenticated = if let Some(expected_token) = &config.auth_token {
426        match &hello.auth_token {
427            Some(provided_token) if provided_token == expected_token => {
428                #[cfg(feature = "tracing")]
429                tracing::debug!("Authentication successful");
430                true
431            }
432            Some(_) => {
433                let error_msg =
434                    r#"{"error":"authentication_failed","message":"Invalid auth token"}"#;
435
436                #[cfg(feature = "tracing")]
437                tracing::warn!("Authentication failed: invalid token");
438
439                let _ = writer.write_all(error_msg.as_bytes()).await;
440                let _ = writer.write_all(b"\n").await;
441                let _ = writer.shutdown().await;
442
443                return Err(DbError::PermissionDenied {
444                    operation: "authentication".to_string(),
445                });
446            }
447            None => {
448                let error_msg =
449                    r#"{"error":"authentication_required","message":"Auth token required"}"#;
450
451                #[cfg(feature = "tracing")]
452                tracing::warn!("Authentication failed: no token provided");
453
454                let _ = writer.write_all(error_msg.as_bytes()).await;
455                let _ = writer.write_all(b"\n").await;
456                let _ = writer.shutdown().await;
457
458                return Err(DbError::PermissionDenied {
459                    operation: "authentication".to_string(),
460                });
461            }
462        }
463    } else {
464        false
465    };
466
467    // Determine permissions based on security policy
468    let permissions = match &config.security_policy {
469        crate::remote::SecurityPolicy::ReadOnly => vec!["read".to_string()],
470        crate::remote::SecurityPolicy::ReadWrite { .. } => {
471            vec!["read".to_string(), "write".to_string()]
472        }
473    };
474
475    // Get writable records by querying database for writable record names
476    let writable_records = match &config.security_policy {
477        crate::remote::SecurityPolicy::ReadOnly => vec![],
478        crate::remote::SecurityPolicy::ReadWrite {
479            writable_records: _writable_type_ids,
480        } => {
481            // Get all records from database
482            let all_records: Vec<RecordMetadata> = db.list_records();
483
484            // Filter to those that are marked writable
485            all_records
486                .into_iter()
487                .filter(|meta| meta.writable)
488                .map(|meta| meta.name)
489                .collect()
490        }
491    };
492
493    // Send Welcome message
494    let welcome = WelcomeMessage {
495        version: "1.0".to_string(),
496        server: "aimdb".to_string(),
497        permissions,
498        writable_records,
499        max_subscriptions: Some(config.subscription_queue_size),
500        authenticated: Some(authenticated),
501    };
502
503    let welcome_json = serde_json::to_string(&welcome).map_err(|e| DbError::JsonWithContext {
504        context: "Failed to serialize Welcome message".to_string(),
505        source: e,
506    })?;
507
508    writer
509        .write_all(welcome_json.as_bytes())
510        .await
511        .map_err(|e| DbError::IoWithContext {
512            context: "Failed to write Welcome message".to_string(),
513            source: e,
514        })?;
515
516    writer
517        .write_all(b"\n")
518        .await
519        .map_err(|e| DbError::IoWithContext {
520            context: "Failed to write Welcome newline".to_string(),
521            source: e,
522        })?;
523
524    #[cfg(feature = "tracing")]
525    tracing::info!("Sent Welcome message to client");
526
527    // Reunite the stream
528    let stream = reader
529        .into_inner()
530        .reunite(writer)
531        .map_err(|e| DbError::Io {
532            source: std::io::Error::other(e.to_string()),
533        })?;
534
535    Ok(BufReader::new(stream))
536}
537
538/// Handles a single request and returns a response
539///
540/// Dispatches to the appropriate handler based on the request method.
541///
542/// # Arguments
543/// * `db` - Database instance
544/// * `config` - Remote access configuration
545/// * `conn_state` - Connection state (for subscription management)
546/// * `request` - The parsed request
547///
548/// # Returns
549/// Response to send to the client
550#[cfg(feature = "std")]
551async fn handle_request<R>(
552    db: &Arc<AimDb<R>>,
553    config: &AimxConfig,
554    conn_state: &mut ConnectionState,
555    request: Request,
556) -> Response
557where
558    R: crate::RuntimeAdapter + crate::Spawn + 'static,
559{
560    #[cfg(feature = "tracing")]
561    tracing::debug!(
562        "Handling request: method={}, id={}",
563        request.method,
564        request.id
565    );
566
567    match request.method.as_str() {
568        "record.list" => handle_record_list(db, config, request.id).await,
569        "record.get" => handle_record_get(db, config, request.id, request.params).await,
570        "record.set" => handle_record_set(db, config, request.id, request.params).await,
571        "record.subscribe" => {
572            handle_record_subscribe(db, config, conn_state, request.id, request.params).await
573        }
574        "record.unsubscribe" => {
575            handle_record_unsubscribe(conn_state, request.id, request.params).await
576        }
577        "record.drain" => handle_record_drain(db, conn_state, request.id, request.params).await,
578        "record.query" => handle_record_query(db, request.id, request.params).await,
579        "graph.nodes" => handle_graph_nodes(db, request.id).await,
580        "graph.edges" => handle_graph_edges(db, request.id).await,
581        "graph.topo_order" => handle_graph_topo_order(db, request.id).await,
582        _ => {
583            #[cfg(feature = "tracing")]
584            tracing::warn!("Unknown method: {}", request.method);
585
586            Response::error(
587                request.id,
588                "method_not_found",
589                format!("Unknown method: {}", request.method),
590            )
591        }
592    }
593}
594
595/// Handles record.list method
596///
597/// Returns metadata for all registered records in the database.
598///
599/// # Arguments
600/// * `db` - Database instance
601/// * `config` - Remote access configuration (for permission checks)
602/// * `request_id` - Request ID for the response
603///
604/// # Returns
605/// Success response with array of RecordMetadata
606#[cfg(feature = "std")]
607async fn handle_record_list<R>(
608    db: &Arc<AimDb<R>>,
609    _config: &AimxConfig,
610    request_id: u64,
611) -> Response
612where
613    R: crate::RuntimeAdapter + crate::Spawn + 'static,
614{
615    #[cfg(feature = "tracing")]
616    tracing::debug!("Listing records");
617
618    // Get all record metadata from database
619    let records: Vec<RecordMetadata> = db.list_records();
620
621    #[cfg(feature = "tracing")]
622    tracing::debug!("Found {} records", records.len());
623
624    // Convert to JSON and return
625    Response::success(request_id, json!(records))
626}
627
628/// Handles record.get method
629///
630/// Returns the current value of a record as JSON.
631///
632/// # Arguments
633/// * `db` - Database instance
634/// * `config` - Remote access configuration (for permission checks)
635/// * `request_id` - Request ID for the response
636/// * `params` - Request parameters (must contain "record" field with record name)
637///
638/// # Returns
639/// Success response with record value as JSON, or error if:
640/// - Missing/invalid "record" parameter
641/// - Record not found
642/// - Record not configured with `.with_remote_access()`
643/// - No value available in atomic snapshot
644#[cfg(feature = "std")]
645async fn handle_record_get<R>(
646    db: &Arc<AimDb<R>>,
647    _config: &AimxConfig,
648    request_id: u64,
649    params: Option<serde_json::Value>,
650) -> Response
651where
652    R: crate::RuntimeAdapter + crate::Spawn + 'static,
653{
654    // Extract record name from params
655    let record_name = match params {
656        Some(serde_json::Value::Object(map)) => match map.get("record") {
657            Some(serde_json::Value::String(name)) => name.clone(),
658            _ => {
659                #[cfg(feature = "tracing")]
660                tracing::warn!("Missing or invalid 'record' parameter");
661
662                return Response::error(
663                    request_id,
664                    "invalid_params",
665                    "Missing or invalid 'record' parameter".to_string(),
666                );
667            }
668        },
669        _ => {
670            #[cfg(feature = "tracing")]
671            tracing::warn!("Missing params object");
672
673            return Response::error(
674                request_id,
675                "invalid_params",
676                "Missing params object".to_string(),
677            );
678        }
679    };
680
681    #[cfg(feature = "tracing")]
682    tracing::debug!("Getting value for record: {}", record_name);
683
684    // Try to peek the record's JSON value
685    match db.try_latest_as_json(&record_name) {
686        Some(value) => {
687            #[cfg(feature = "tracing")]
688            tracing::debug!("Successfully retrieved value for {}", record_name);
689
690            Response::success(request_id, value)
691        }
692        None => {
693            #[cfg(feature = "tracing")]
694            tracing::warn!("No value available for record: {}", record_name);
695
696            Response::error(
697                request_id,
698                "not_found",
699                format!("No value available for record: {}", record_name),
700            )
701        }
702    }
703}
704
705/// Handles record.set method
706///
707/// Sets a record value from JSON (write operation).
708///
709/// **SAFETY:** Enforces the "No Producer Override" rule:
710/// - Only allows writes to configuration records (producer_count == 0)
711/// - Prevents remote access from interfering with application logic
712///
713/// # Arguments
714/// * `db` - Database instance
715/// * `config` - Remote access configuration (for permission checks)
716/// * `request_id` - Request ID for the response
717/// * `params` - Request parameters (must contain "name" and "value" fields)
718///
719/// # Returns
720/// Success response, or error if:
721/// - Missing/invalid parameters
722/// - Record not found
723/// - Permission denied (not writable or has active producers)
724/// - Deserialization failed
725#[cfg(feature = "std")]
726async fn handle_record_set<R>(
727    db: &Arc<AimDb<R>>,
728    config: &AimxConfig,
729    request_id: u64,
730    params: Option<serde_json::Value>,
731) -> Response
732where
733    R: crate::RuntimeAdapter + crate::Spawn + 'static,
734{
735    use crate::remote::SecurityPolicy;
736
737    // Check if write operations are allowed
738    let writable_records = match &config.security_policy {
739        SecurityPolicy::ReadOnly => {
740            #[cfg(feature = "tracing")]
741            tracing::warn!("record.set called but security policy is ReadOnly");
742
743            return Response::error(
744                request_id,
745                "permission_denied",
746                "Write operations not allowed (ReadOnly security policy)".to_string(),
747            );
748        }
749        SecurityPolicy::ReadWrite { writable_records } => writable_records,
750    };
751
752    // Extract record name and value from params
753    let (record_name, value) = match params {
754        Some(serde_json::Value::Object(ref map)) => {
755            let name = match map.get("name") {
756                Some(serde_json::Value::String(n)) => n.clone(),
757                _ => {
758                    #[cfg(feature = "tracing")]
759                    tracing::warn!("Missing or invalid 'name' parameter in record.set");
760
761                    return Response::error(
762                        request_id,
763                        "invalid_params",
764                        "Missing or invalid 'name' parameter (expected string)".to_string(),
765                    );
766                }
767            };
768
769            let val = match map.get("value") {
770                Some(v) => v.clone(),
771                None => {
772                    #[cfg(feature = "tracing")]
773                    tracing::warn!("Missing 'value' parameter in record.set");
774
775                    return Response::error(
776                        request_id,
777                        "invalid_params",
778                        "Missing 'value' parameter".to_string(),
779                    );
780                }
781            };
782
783            (name, val)
784        }
785        _ => {
786            #[cfg(feature = "tracing")]
787            tracing::warn!("Missing params object in record.set");
788
789            return Response::error(
790                request_id,
791                "invalid_params",
792                "Missing params object".to_string(),
793            );
794        }
795    };
796
797    #[cfg(feature = "tracing")]
798    tracing::debug!("Setting value for record: {}", record_name);
799
800    // Check if record is in the writable_records set (using record key)
801    if !writable_records.contains(&record_name) {
802        #[cfg(feature = "tracing")]
803        tracing::warn!("Record '{}' not in writable_records set", record_name);
804
805        return Response::error(
806            request_id,
807            "permission_denied",
808            format!(
809                "Record '{}' is not writable. \
810                 Configure with .with_writable_record() to allow writes.",
811                record_name
812            ),
813        );
814    }
815
816    // Attempt to set the value
817    // This will enforce the "no producer override" rule internally
818    match db.set_record_from_json(&record_name, value) {
819        Ok(()) => {
820            #[cfg(feature = "tracing")]
821            tracing::info!("Successfully set value for record: {}", record_name);
822
823            // Get the updated value to return in response
824            let result = if let Some(updated_value) = db.try_latest_as_json(&record_name) {
825                serde_json::json!({
826                    "status": "success",
827                    "value": updated_value,
828                })
829            } else {
830                serde_json::json!({
831                    "status": "success",
832                })
833            };
834
835            Response::success(request_id, result)
836        }
837        Err(e) => {
838            #[cfg(feature = "tracing")]
839            tracing::error!("Failed to set value for record '{}': {}", record_name, e);
840
841            // Map internal errors to appropriate response codes
842            let (code, message) = match e {
843                crate::DbError::RecordKeyNotFound { key } => {
844                    ("not_found", format!("Record '{}' not found", key))
845                }
846                crate::DbError::PermissionDenied { operation } => {
847                    // This is the "has active producers" error
848                    ("permission_denied", operation)
849                }
850                crate::DbError::JsonWithContext { context, .. } => (
851                    "validation_error",
852                    format!("JSON validation failed: {}", context),
853                ),
854                crate::DbError::RuntimeError { message } => ("internal_error", message),
855                _ => ("internal_error", format!("Failed to set value: {}", e)),
856            };
857
858            Response::error(request_id, code, message)
859        }
860    }
861}
862
863/// Handles record.subscribe method
864///
865/// Subscribes to live updates for a record.
866///
867/// # Arguments
868/// * `db` - Database instance
869/// * `config` - Remote access configuration
870/// * `conn_state` - Connection state (for subscription tracking)
871/// * `request_id` - Request ID for the response
872/// * `params` - Request parameters (must contain "name" field with record name)
873///
874/// # Returns
875/// Success response with subscription_id and queue_size, or error if:
876/// - Missing/invalid parameters
877/// - Record not found
878/// - Too many subscriptions
879#[cfg(feature = "std")]
880async fn handle_record_subscribe<R>(
881    db: &Arc<AimDb<R>>,
882    config: &AimxConfig,
883    conn_state: &mut ConnectionState,
884    request_id: u64,
885    params: Option<serde_json::Value>,
886) -> Response
887where
888    R: crate::RuntimeAdapter + crate::Spawn + 'static,
889{
890    // Extract record name from params
891    let record_name = match params {
892        Some(serde_json::Value::Object(ref map)) => match map.get("name") {
893            Some(serde_json::Value::String(name)) => name.clone(),
894            _ => {
895                #[cfg(feature = "tracing")]
896                tracing::warn!("Missing or invalid 'name' parameter in record.subscribe");
897
898                return Response::error(
899                    request_id,
900                    "invalid_params",
901                    "Missing or invalid 'name' parameter (expected string)".to_string(),
902                );
903            }
904        },
905        _ => {
906            #[cfg(feature = "tracing")]
907            tracing::warn!("Missing params object in record.subscribe");
908
909            return Response::error(
910                request_id,
911                "invalid_params",
912                "Missing params object".to_string(),
913            );
914        }
915    };
916
917    // Optional: send_initial flag (default true)
918    let _send_initial = params
919        .as_ref()
920        .and_then(|p| p.as_object())
921        .and_then(|map| map.get("send_initial"))
922        .and_then(|v| v.as_bool())
923        .unwrap_or(true);
924
925    #[cfg(feature = "tracing")]
926    tracing::debug!("Subscribing to record: {}", record_name);
927
928    // Check max subscriptions limit
929    if conn_state.subscriptions.len() >= config.subscription_queue_size {
930        #[cfg(feature = "tracing")]
931        tracing::warn!(
932            "Too many subscriptions: {} (max: {})",
933            conn_state.subscriptions.len(),
934            config.subscription_queue_size
935        );
936
937        return Response::error(
938            request_id,
939            "too_many_subscriptions",
940            format!(
941                "Maximum subscriptions reached: {}",
942                config.subscription_queue_size
943            ),
944        );
945    }
946
947    // Generate unique subscription ID
948    let subscription_id = conn_state.generate_subscription_id();
949
950    // Subscribe to record updates via the database API (using record key)
951    let (value_rx, cancel_tx) =
952        match db.subscribe_record_updates(&record_name, config.subscription_queue_size) {
953            Ok(channels) => channels,
954            Err(e) => {
955                // Map internal errors to appropriate response codes
956                let (code, message) = match &e {
957                    crate::DbError::RecordKeyNotFound { key } => {
958                        #[cfg(feature = "tracing")]
959                        tracing::warn!("Record not found: {}", key);
960                        ("not_found", format!("Record '{}' not found", key))
961                    }
962                    _ => {
963                        #[cfg(feature = "tracing")]
964                        tracing::error!("Failed to subscribe to record updates: {}", e);
965                        ("internal_error", format!("Failed to subscribe: {}", e))
966                    }
967                };
968
969                return Response::error(request_id, code, message);
970            }
971        };
972
973    // Spawn event streaming task for this subscription
974    let event_tx = conn_state.event_tx.clone();
975    let sub_id_clone = subscription_id.clone();
976    let stream_handle = tokio::spawn(async move {
977        stream_subscription_events(sub_id_clone, value_rx, event_tx).await;
978    });
979
980    // Store subscription handle
981    let handle = SubscriptionHandle {
982        subscription_id: subscription_id.clone(),
983        record_name: record_name.clone(),
984        cancel_tx,
985    };
986    conn_state.add_subscription(handle);
987
988    // Detach the streaming task (it will run until cancelled or channel closes)
989    std::mem::drop(stream_handle);
990
991    #[cfg(feature = "tracing")]
992    tracing::info!(
993        "Created subscription {} for record {}",
994        subscription_id,
995        record_name
996    );
997
998    // Return success response
999    Response::success(
1000        request_id,
1001        json!({
1002            "subscription_id": subscription_id,
1003            "queue_size": config.subscription_queue_size,
1004        }),
1005    )
1006}
1007
1008/// Streams subscription events from value channel to event channel
1009///
1010/// Reads JSON values from the subscription's receiver and converts them
1011/// into Event messages with sequence numbers and timestamps.
1012///
1013/// # Arguments
1014/// * `subscription_id` - Unique subscription identifier
1015/// * `value_rx` - Receiver for JSON values from the database
1016/// * `event_tx` - Sender for Event messages to the client
1017#[cfg(feature = "std")]
1018async fn stream_subscription_events(
1019    subscription_id: String,
1020    mut value_rx: tokio::sync::mpsc::Receiver<serde_json::Value>,
1021    event_tx: tokio::sync::mpsc::UnboundedSender<Event>,
1022) {
1023    let mut sequence: u64 = 1;
1024
1025    #[cfg(feature = "tracing")]
1026    tracing::debug!(
1027        "Event streaming task started for subscription: {}",
1028        subscription_id
1029    );
1030
1031    while let Some(json_value) = value_rx.recv().await {
1032        // Generate timestamp in "secs.nanosecs" format
1033        let duration = std::time::SystemTime::now()
1034            .duration_since(std::time::UNIX_EPOCH)
1035            .unwrap_or_default();
1036        let timestamp = format!("{}.{:09}", duration.as_secs(), duration.subsec_nanos());
1037
1038        // Create event
1039        let event = Event {
1040            subscription_id: subscription_id.clone(),
1041            sequence,
1042            data: json_value,
1043            timestamp,
1044            dropped: None, // TODO: Implement dropped event tracking
1045        };
1046
1047        // Send event to the funnel
1048        if event_tx.send(event).is_err() {
1049            #[cfg(feature = "tracing")]
1050            tracing::debug!(
1051                "Event channel closed, terminating stream for subscription: {}",
1052                subscription_id
1053            );
1054            break;
1055        }
1056
1057        sequence += 1;
1058    }
1059
1060    #[cfg(feature = "tracing")]
1061    tracing::debug!(
1062        "Event streaming task terminated for subscription: {}",
1063        subscription_id
1064    );
1065}
1066
1067/// Handles record.unsubscribe method
1068///
1069/// Cancels an active subscription.
1070///
1071/// # Arguments
1072/// * `conn_state` - Connection state (for subscription tracking)
1073/// * `request_id` - Request ID for the response
1074/// * `params` - Request parameters (must contain "subscription_id" field)
1075///
1076/// # Returns
1077/// Success response, or error if subscription not found
1078#[cfg(feature = "std")]
1079async fn handle_record_unsubscribe(
1080    conn_state: &mut ConnectionState,
1081    request_id: u64,
1082    params: Option<serde_json::Value>,
1083) -> Response {
1084    // Parse subscription_id parameter
1085    let subscription_id = match params {
1086        Some(serde_json::Value::Object(ref map)) => match map.get("subscription_id") {
1087            Some(serde_json::Value::String(id)) => id.clone(),
1088            _ => {
1089                return Response::error(
1090                    request_id,
1091                    "invalid_params",
1092                    "Missing or invalid 'subscription_id' parameter".to_string(),
1093                )
1094            }
1095        },
1096        _ => {
1097            return Response::error(
1098                request_id,
1099                "invalid_params",
1100                "Missing 'subscription_id' parameter".to_string(),
1101            )
1102        }
1103    };
1104
1105    #[cfg(feature = "tracing")]
1106    tracing::debug!("Unsubscribing from subscription_id: {}", subscription_id);
1107
1108    // Look up and remove the subscription
1109    match conn_state.subscriptions.remove(&subscription_id) {
1110        Some(handle) => {
1111            // Send cancellation signal to the streaming task
1112            // It's okay if this fails (task may have already terminated)
1113            let _ = handle.cancel_tx.send(());
1114
1115            #[cfg(feature = "tracing")]
1116            tracing::debug!(
1117                "Cancelled subscription {} for record {}",
1118                subscription_id,
1119                handle.record_name
1120            );
1121
1122            Response::success(
1123                request_id,
1124                serde_json::json!({
1125                    "subscription_id": subscription_id,
1126                    "status": "cancelled"
1127                }),
1128            )
1129        }
1130        None => {
1131            #[cfg(feature = "tracing")]
1132            tracing::warn!("Subscription not found: {}", subscription_id);
1133
1134            Response::error(
1135                request_id,
1136                "not_found",
1137                format!("Subscription '{}' not found", subscription_id),
1138            )
1139        }
1140    }
1141}
1142
1143/// Handles record.drain method
1144///
1145/// Drains all pending values from a record's drain reader. On the first call for
1146/// a given record, creates a dedicated drain reader (returns empty). Subsequent
1147/// calls return all values accumulated since the previous drain.
1148///
1149/// # Arguments
1150/// * `db` - Database instance
1151/// * `conn_state` - Connection state (for drain reader management)
1152/// * `request_id` - Request ID for the response
1153/// * `params` - Request parameters (must contain "name" field, optional "limit")
1154///
1155/// # Returns
1156/// Success response with `record_name`, `values` array, and `count`, or error if:
1157/// - Missing/invalid parameters
1158/// - Record not found
1159/// - Record not configured with `.with_remote_access()`
1160#[cfg(feature = "std")]
1161async fn handle_record_drain<R>(
1162    db: &Arc<AimDb<R>>,
1163    conn_state: &mut ConnectionState,
1164    request_id: u64,
1165    params: Option<serde_json::Value>,
1166) -> Response
1167where
1168    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1169{
1170    // Extract record name from params
1171    let record_name = match params {
1172        Some(serde_json::Value::Object(ref map)) => match map.get("name") {
1173            Some(serde_json::Value::String(name)) => name.clone(),
1174            _ => {
1175                return Response::error(
1176                    request_id,
1177                    "invalid_params",
1178                    "Missing or invalid 'name' parameter (expected string)".to_string(),
1179                );
1180            }
1181        },
1182        _ => {
1183            return Response::error(
1184                request_id,
1185                "invalid_params",
1186                "Missing params object".to_string(),
1187            );
1188        }
1189    };
1190
1191    // Optional: limit parameter
1192    // Use try_from instead of `as` to avoid silent truncation on 32-bit targets
1193    // (values that don't fit in usize are treated as "no limit").
1194    let limit = params
1195        .as_ref()
1196        .and_then(|p| p.as_object())
1197        .and_then(|map| map.get("limit"))
1198        .and_then(|v| v.as_u64())
1199        .map(|v| usize::try_from(v).unwrap_or(usize::MAX))
1200        .unwrap_or(usize::MAX);
1201
1202    #[cfg(feature = "tracing")]
1203    tracing::debug!(
1204        "Draining record: {} (limit: {})",
1205        record_name,
1206        if limit == usize::MAX {
1207            "all".to_string()
1208        } else {
1209            limit.to_string()
1210        }
1211    );
1212
1213    // Lazily create drain reader on first call for this record
1214    if !conn_state.drain_readers.contains_key(&record_name) {
1215        // Resolve record key → RecordId → AnyRecord → subscribe_json()
1216        let id = match db.inner().resolve_str(&record_name) {
1217            Some(id) => id,
1218            None => {
1219                return Response::error(
1220                    request_id,
1221                    "not_found",
1222                    format!("Record '{}' not found", record_name),
1223                );
1224            }
1225        };
1226
1227        let record = match db.inner().storage(id) {
1228            Some(r) => r,
1229            None => {
1230                return Response::error(
1231                    request_id,
1232                    "not_found",
1233                    format!("Record '{}' storage not found", record_name),
1234                );
1235            }
1236        };
1237
1238        let reader = match record.subscribe_json() {
1239            Ok(r) => r,
1240            Err(e) => {
1241                return Response::error(
1242                    request_id,
1243                    "remote_access_not_enabled",
1244                    format!(
1245                        "Record '{}' not configured with .with_remote_access(): {}",
1246                        record_name, e
1247                    ),
1248                );
1249            }
1250        };
1251
1252        conn_state.drain_readers.insert(record_name.clone(), reader);
1253    }
1254
1255    // Drain all pending values from the reader
1256    let reader = conn_state.drain_readers.get_mut(&record_name).unwrap();
1257    let mut values = Vec::new();
1258
1259    loop {
1260        if values.len() >= limit {
1261            break;
1262        }
1263        match reader.try_recv_json() {
1264            Ok(val) => values.push(val),
1265            Err(DbError::BufferEmpty) => break,
1266            Err(DbError::BufferLagged { .. }) => {
1267                // Ring overflowed since last drain — cursor resets.
1268                // Log warning, keep draining.
1269                #[cfg(feature = "tracing")]
1270                tracing::warn!(
1271                    "Drain reader lagged for record '{}' — some values were lost",
1272                    record_name
1273                );
1274                continue;
1275            }
1276            Err(_) => break,
1277        }
1278    }
1279
1280    let count = values.len();
1281
1282    #[cfg(feature = "tracing")]
1283    tracing::debug!("Drained {} values from record '{}'", count, record_name);
1284
1285    Response::success(
1286        request_id,
1287        json!({
1288            "record_name": record_name,
1289            "values": values,
1290            "count": count,
1291        }),
1292    )
1293}
1294
1295// ============================================================================
1296// Persistence Query (record.query)
1297// ============================================================================
1298
1299/// Type-erased query handler registered by `aimdb-persistence` via Extensions.
1300///
1301/// This keeps `aimdb-core` free of persistence-specific imports. The handler is
1302/// a boxed async function that accepts query parameters (record pattern, limit,
1303/// start/end timestamps) and returns a JSON value with the results.
1304///
1305/// Registered by `aimdb_persistence` via the `with_persistence()` builder extension.
1306pub type QueryHandlerFn = Box<
1307    dyn Fn(
1308            QueryHandlerParams,
1309        ) -> core::pin::Pin<
1310            Box<dyn core::future::Future<Output = Result<serde_json::Value, String>> + Send>,
1311        > + Send
1312        + Sync,
1313>;
1314
1315/// Parameters for the type-erased query handler.
1316#[derive(Debug, Clone)]
1317pub struct QueryHandlerParams {
1318    /// Record pattern (supports `*` wildcard).
1319    pub name: String,
1320    /// Maximum results per matching record.
1321    pub limit: Option<usize>,
1322    /// Optional start timestamp (Unix ms).
1323    pub start: Option<u64>,
1324    /// Optional end timestamp (Unix ms).
1325    pub end: Option<u64>,
1326}
1327
1328/// Handles `record.query` method.
1329///
1330/// Delegates to a [`QueryHandlerFn`] stored in the database's `Extensions`
1331/// TypeMap. If no handler is registered (i.e. persistence is not configured),
1332/// returns an error.
1333#[cfg(feature = "std")]
1334async fn handle_record_query<R>(
1335    db: &Arc<AimDb<R>>,
1336    request_id: u64,
1337    params: Option<serde_json::Value>,
1338) -> Response
1339where
1340    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1341{
1342    // Extract the query handler from Extensions.
1343    let handler = match db.extensions().get::<QueryHandlerFn>() {
1344        Some(h) => h,
1345        None => {
1346            return Response::error(
1347                request_id,
1348                "not_configured",
1349                "Persistence not configured. Call .with_persistence() on the builder.".to_string(),
1350            );
1351        }
1352    };
1353
1354    // Parse parameters
1355    let (name, limit, start, end) = match &params {
1356        Some(serde_json::Value::Object(map)) => {
1357            let name = map
1358                .get("name")
1359                .and_then(|v| v.as_str())
1360                .unwrap_or("*")
1361                .to_string();
1362            let limit = map
1363                .get("limit")
1364                .and_then(|v| v.as_u64())
1365                .and_then(|v| usize::try_from(v).ok());
1366            let start = map.get("start").and_then(|v| v.as_u64());
1367            let end = map.get("end").and_then(|v| v.as_u64());
1368            (name, limit, start, end)
1369        }
1370        _ => ("*".to_string(), None, None, None),
1371    };
1372
1373    let query_params = QueryHandlerParams {
1374        name,
1375        limit,
1376        start,
1377        end,
1378    };
1379
1380    match handler(query_params).await {
1381        Ok(result) => Response::success(request_id, result),
1382        Err(msg) => Response::error(request_id, "query_error", msg),
1383    }
1384}
1385
1386// ============================================================================
1387// Graph Introspection Methods
1388// ============================================================================
1389
1390/// Handles graph.nodes method
1391///
1392/// Returns all nodes in the dependency graph with their metadata.
1393/// Each node represents a record with its origin, buffer type, and connections.
1394///
1395/// # Arguments
1396/// * `db` - Database instance
1397/// * `request_id` - Request ID for the response
1398///
1399/// # Returns
1400/// Success response with array of GraphNode objects:
1401/// - `key`: Record key (e.g., "temp.vienna")
1402/// - `origin`: How the record gets its values (source, link, transform, passive)
1403/// - `buffer_type`: Buffer type ("spmc_ring", "single_latest", "mailbox", "none")
1404/// - `buffer_capacity`: Optional buffer capacity
1405/// - `tap_count`: Number of taps attached
1406/// - `has_outbound_link`: Whether an outbound connector is configured
1407#[cfg(feature = "std")]
1408async fn handle_graph_nodes<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1409where
1410    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1411{
1412    #[cfg(feature = "tracing")]
1413    tracing::debug!("Getting dependency graph nodes");
1414
1415    let graph = db.inner().dependency_graph();
1416    let nodes = &graph.nodes;
1417
1418    #[cfg(feature = "tracing")]
1419    tracing::debug!("Returning {} graph nodes", nodes.len());
1420
1421    Response::success(request_id, json!(nodes))
1422}
1423
1424/// Handles graph.edges method
1425///
1426/// Returns all edges in the dependency graph representing data flow between records.
1427/// Edges are directed from source to target and include the edge type.
1428///
1429/// # Arguments
1430/// * `db` - Database instance
1431/// * `request_id` - Request ID for the response
1432///
1433/// # Returns
1434/// Success response with array of GraphEdge objects:
1435/// - `from`: Source record key
1436/// - `to`: Target record key
1437/// - `edge_type`: Type of connection (TransformInput, TransformJoinInput, etc.)
1438#[cfg(feature = "std")]
1439async fn handle_graph_edges<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1440where
1441    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1442{
1443    #[cfg(feature = "tracing")]
1444    tracing::debug!("Getting dependency graph edges");
1445
1446    let graph = db.inner().dependency_graph();
1447    let edges = &graph.edges;
1448
1449    #[cfg(feature = "tracing")]
1450    tracing::debug!("Returning {} graph edges", edges.len());
1451
1452    Response::success(request_id, json!(edges))
1453}
1454
1455/// Handles graph.topo_order method
1456///
1457/// Returns the topological ordering of records in the dependency graph.
1458/// This ordering ensures that all dependencies are processed before dependents.
1459/// Used for spawn ordering and understanding data flow.
1460///
1461/// # Arguments
1462/// * `db` - Database instance
1463/// * `request_id` - Request ID for the response
1464///
1465/// # Returns
1466/// Success response with array of record keys in topological order:
1467/// - Sources and passive records first
1468/// - Transform outputs after their inputs
1469/// - Respects the DAG structure for proper initialization order
1470#[cfg(feature = "std")]
1471async fn handle_graph_topo_order<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1472where
1473    R: crate::RuntimeAdapter + crate::Spawn + 'static,
1474{
1475    #[cfg(feature = "tracing")]
1476    tracing::debug!("Getting topological order");
1477
1478    let graph = db.inner().dependency_graph();
1479    let topo_order = graph.topo_order();
1480
1481    #[cfg(feature = "tracing")]
1482    tracing::debug!(
1483        "Returning topological order with {} records",
1484        topo_order.len()
1485    );
1486
1487    Response::success(request_id, json!(topo_order))
1488}