1use crate::remote::{
15 AimxConfig, Event, HelloMessage, RecordMetadata, Request, Response, WelcomeMessage,
16};
17use crate::{AimDb, DbError, DbResult};
18
19#[cfg(feature = "std")]
20use std::collections::HashMap;
21#[cfg(feature = "std")]
22use std::sync::Arc;
23
24#[cfg(feature = "std")]
25use serde_json::json;
26#[cfg(feature = "std")]
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
28#[cfg(feature = "std")]
29use tokio::net::UnixStream;
30#[cfg(feature = "std")]
31use tokio::sync::mpsc;
32#[cfg(feature = "std")]
33use tokio::sync::oneshot;
34
35#[cfg(feature = "std")]
39#[allow(dead_code)] struct SubscriptionHandle {
41 subscription_id: String,
43
44 record_name: String,
46
47 cancel_tx: oneshot::Sender<()>,
50}
51
52#[cfg(feature = "std")]
56struct ConnectionState {
57 subscriptions: HashMap<String, SubscriptionHandle>,
59
60 next_subscription_id: u64,
62
63 event_tx: mpsc::UnboundedSender<Event>,
66
67 drain_readers: HashMap<String, Box<dyn crate::buffer::JsonBufferReader + Send>>,
70}
71
72#[cfg(feature = "std")]
73impl ConnectionState {
74 fn new(event_tx: mpsc::UnboundedSender<Event>) -> Self {
76 Self {
77 subscriptions: HashMap::new(),
78 next_subscription_id: 1,
79 event_tx,
80 drain_readers: HashMap::new(),
81 }
82 }
83
84 fn generate_subscription_id(&mut self) -> String {
86 let id = format!("sub-{}", self.next_subscription_id);
87 self.next_subscription_id += 1;
88 id
89 }
90
91 fn add_subscription(&mut self, handle: SubscriptionHandle) {
93 self.subscriptions
94 .insert(handle.subscription_id.clone(), handle);
95 }
96
97 #[allow(dead_code)]
99 fn remove_subscription(&mut self, subscription_id: &str) -> Option<SubscriptionHandle> {
100 self.subscriptions.remove(subscription_id)
101 }
102
103 async fn cancel_all_subscriptions(&mut self) {
108 #[cfg(feature = "tracing")]
109 tracing::info!(
110 "Canceling {} active subscriptions",
111 self.subscriptions.len()
112 );
113
114 for (_id, handle) in self.subscriptions.drain() {
115 let _ = handle.cancel_tx.send(());
117 }
118 }
119}
120
121#[cfg(feature = "std")]
154pub async fn handle_connection<R>(
155 db: Arc<AimDb<R>>,
156 config: AimxConfig,
157 stream: UnixStream,
158) -> DbResult<()>
159where
160 R: crate::RuntimeAdapter + crate::Spawn + 'static,
161{
162 #[cfg(feature = "tracing")]
163 tracing::info!("New remote access connection established");
164
165 let mut stream = match perform_handshake(stream, &config, &db).await {
167 Ok(stream) => stream,
168 Err(e) => {
169 #[cfg(feature = "tracing")]
170 tracing::warn!("Handshake failed: {}", e);
171 return Err(e);
172 }
173 };
174
175 #[cfg(feature = "tracing")]
176 tracing::info!("Handshake complete, client ready");
177
178 let (event_tx, mut event_rx) = mpsc::unbounded_channel::<Event>();
180
181 let mut conn_state = ConnectionState::new(event_tx);
183
184 loop {
186 let mut line = String::new();
187
188 tokio::select! {
189 read_result = stream.read_line(&mut line) => {
191 match read_result {
192 Ok(0) => {
193 #[cfg(feature = "tracing")]
195 tracing::info!("Client disconnected gracefully");
196 break;
197 }
198 Ok(_) => {
199 #[cfg(feature = "tracing")]
200 tracing::debug!("Received request: {}", line.trim());
201
202 let request: Request = match serde_json::from_str(line.trim()) {
204 Ok(req) => req,
205 Err(e) => {
206 #[cfg(feature = "tracing")]
207 tracing::warn!("Failed to parse request: {}", e);
208
209 let error_response =
211 Response::error(0, "parse_error", format!("Invalid JSON: {}", e));
212 if let Err(_e) = send_response(&mut stream, &error_response).await {
213 #[cfg(feature = "tracing")]
214 tracing::error!("Failed to send error response: {}", _e);
215 break;
216 }
217 continue;
218 }
219 };
220
221 let response = handle_request(&db, &config, &mut conn_state, request).await;
223
224 if let Err(_e) = send_response(&mut stream, &response).await {
226 #[cfg(feature = "tracing")]
227 tracing::error!("Failed to send response: {}", _e);
228 break;
229 }
230 }
231 Err(_e) => {
232 #[cfg(feature = "tracing")]
233 tracing::error!("Error reading from stream: {}", _e);
234 break;
235 }
236 }
237 }
238
239 Some(event) = event_rx.recv() => {
241 if let Err(_e) = send_event(&mut stream, &event).await {
242 #[cfg(feature = "tracing")]
243 tracing::error!("Failed to send event: {}", _e);
244 break;
245 }
246 }
247 }
248 }
249
250 conn_state.cancel_all_subscriptions().await;
252
253 #[cfg(feature = "tracing")]
254 tracing::info!("Connection handler terminating");
255
256 Ok(())
257}
258
259#[cfg(feature = "std")]
270async fn send_event(stream: &mut BufReader<UnixStream>, event: &Event) -> DbResult<()> {
271 let event_msg = json!({ "event": event });
273
274 let event_json = serde_json::to_string(&event_msg).map_err(|e| DbError::JsonWithContext {
275 context: "Failed to serialize event".to_string(),
276 source: e,
277 })?;
278
279 stream
280 .get_mut()
281 .write_all(event_json.as_bytes())
282 .await
283 .map_err(|e| DbError::IoWithContext {
284 context: "Failed to write event".to_string(),
285 source: e,
286 })?;
287
288 stream
289 .get_mut()
290 .write_all(b"\n")
291 .await
292 .map_err(|e| DbError::IoWithContext {
293 context: "Failed to write event newline".to_string(),
294 source: e,
295 })?;
296
297 #[cfg(feature = "tracing")]
298 tracing::trace!("Sent event for subscription: {}", event.subscription_id);
299
300 Ok(())
301}
302
303#[cfg(feature = "std")]
314async fn send_response(stream: &mut BufReader<UnixStream>, response: &Response) -> DbResult<()> {
315 let response_json = serde_json::to_string(response).map_err(|e| DbError::JsonWithContext {
316 context: "Failed to serialize response".to_string(),
317 source: e,
318 })?;
319
320 stream
321 .get_mut()
322 .write_all(response_json.as_bytes())
323 .await
324 .map_err(|e| DbError::IoWithContext {
325 context: "Failed to write response".to_string(),
326 source: e,
327 })?;
328
329 stream
330 .get_mut()
331 .write_all(b"\n")
332 .await
333 .map_err(|e| DbError::IoWithContext {
334 context: "Failed to write response newline".to_string(),
335 source: e,
336 })?;
337
338 #[cfg(feature = "tracing")]
339 tracing::debug!("Sent response");
340
341 Ok(())
342}
343
344#[cfg(feature = "std")]
366async fn perform_handshake<R>(
367 stream: UnixStream,
368 config: &AimxConfig,
369 db: &Arc<AimDb<R>>,
370) -> DbResult<BufReader<UnixStream>>
371where
372 R: crate::RuntimeAdapter + crate::Spawn + 'static,
373{
374 let (reader, mut writer) = stream.into_split();
375 let mut reader = BufReader::new(reader);
376
377 let mut line = String::new();
379 reader
380 .read_line(&mut line)
381 .await
382 .map_err(|e| DbError::IoWithContext {
383 context: "Failed to read Hello message".to_string(),
384 source: e,
385 })?;
386
387 #[cfg(feature = "tracing")]
388 tracing::debug!("Received handshake: {}", line.trim());
389
390 let hello: HelloMessage =
392 serde_json::from_str(line.trim()).map_err(|e| DbError::JsonWithContext {
393 context: "Failed to parse Hello message".to_string(),
394 source: e,
395 })?;
396
397 #[cfg(feature = "tracing")]
398 tracing::debug!(
399 "Client hello: version={}, client={}",
400 hello.version,
401 hello.client
402 );
403
404 if hello.version != "1.0" && hello.version != "1" {
406 let error_msg = format!(
407 r#"{{"error":"unsupported_version","message":"Server supports version 1.0, client requested {}"}}"#,
408 hello.version
409 );
410
411 #[cfg(feature = "tracing")]
412 tracing::warn!("Unsupported version: {}", hello.version);
413
414 let _ = writer.write_all(error_msg.as_bytes()).await;
415 let _ = writer.write_all(b"\n").await;
416 let _ = writer.shutdown().await;
417
418 return Err(DbError::InvalidOperation {
419 operation: "handshake".to_string(),
420 reason: format!("Unsupported version: {}", hello.version),
421 });
422 }
423
424 let authenticated = if let Some(expected_token) = &config.auth_token {
426 match &hello.auth_token {
427 Some(provided_token) if provided_token == expected_token => {
428 #[cfg(feature = "tracing")]
429 tracing::debug!("Authentication successful");
430 true
431 }
432 Some(_) => {
433 let error_msg =
434 r#"{"error":"authentication_failed","message":"Invalid auth token"}"#;
435
436 #[cfg(feature = "tracing")]
437 tracing::warn!("Authentication failed: invalid token");
438
439 let _ = writer.write_all(error_msg.as_bytes()).await;
440 let _ = writer.write_all(b"\n").await;
441 let _ = writer.shutdown().await;
442
443 return Err(DbError::PermissionDenied {
444 operation: "authentication".to_string(),
445 });
446 }
447 None => {
448 let error_msg =
449 r#"{"error":"authentication_required","message":"Auth token required"}"#;
450
451 #[cfg(feature = "tracing")]
452 tracing::warn!("Authentication failed: no token provided");
453
454 let _ = writer.write_all(error_msg.as_bytes()).await;
455 let _ = writer.write_all(b"\n").await;
456 let _ = writer.shutdown().await;
457
458 return Err(DbError::PermissionDenied {
459 operation: "authentication".to_string(),
460 });
461 }
462 }
463 } else {
464 false
465 };
466
467 let permissions = match &config.security_policy {
469 crate::remote::SecurityPolicy::ReadOnly => vec!["read".to_string()],
470 crate::remote::SecurityPolicy::ReadWrite { .. } => {
471 vec!["read".to_string(), "write".to_string()]
472 }
473 };
474
475 let writable_records = match &config.security_policy {
477 crate::remote::SecurityPolicy::ReadOnly => vec![],
478 crate::remote::SecurityPolicy::ReadWrite {
479 writable_records: _writable_type_ids,
480 } => {
481 let all_records: Vec<RecordMetadata> = db.list_records();
483
484 all_records
486 .into_iter()
487 .filter(|meta| meta.writable)
488 .map(|meta| meta.name)
489 .collect()
490 }
491 };
492
493 let welcome = WelcomeMessage {
495 version: "1.0".to_string(),
496 server: "aimdb".to_string(),
497 permissions,
498 writable_records,
499 max_subscriptions: Some(config.subscription_queue_size),
500 authenticated: Some(authenticated),
501 };
502
503 let welcome_json = serde_json::to_string(&welcome).map_err(|e| DbError::JsonWithContext {
504 context: "Failed to serialize Welcome message".to_string(),
505 source: e,
506 })?;
507
508 writer
509 .write_all(welcome_json.as_bytes())
510 .await
511 .map_err(|e| DbError::IoWithContext {
512 context: "Failed to write Welcome message".to_string(),
513 source: e,
514 })?;
515
516 writer
517 .write_all(b"\n")
518 .await
519 .map_err(|e| DbError::IoWithContext {
520 context: "Failed to write Welcome newline".to_string(),
521 source: e,
522 })?;
523
524 #[cfg(feature = "tracing")]
525 tracing::info!("Sent Welcome message to client");
526
527 let stream = reader
529 .into_inner()
530 .reunite(writer)
531 .map_err(|e| DbError::Io {
532 source: std::io::Error::other(e.to_string()),
533 })?;
534
535 Ok(BufReader::new(stream))
536}
537
538#[cfg(feature = "std")]
551async fn handle_request<R>(
552 db: &Arc<AimDb<R>>,
553 config: &AimxConfig,
554 conn_state: &mut ConnectionState,
555 request: Request,
556) -> Response
557where
558 R: crate::RuntimeAdapter + crate::Spawn + 'static,
559{
560 #[cfg(feature = "tracing")]
561 tracing::debug!(
562 "Handling request: method={}, id={}",
563 request.method,
564 request.id
565 );
566
567 match request.method.as_str() {
568 "record.list" => handle_record_list(db, config, request.id).await,
569 "record.get" => handle_record_get(db, config, request.id, request.params).await,
570 "record.set" => handle_record_set(db, config, request.id, request.params).await,
571 "record.subscribe" => {
572 handle_record_subscribe(db, config, conn_state, request.id, request.params).await
573 }
574 "record.unsubscribe" => {
575 handle_record_unsubscribe(conn_state, request.id, request.params).await
576 }
577 "record.drain" => handle_record_drain(db, conn_state, request.id, request.params).await,
578 "record.query" => handle_record_query(db, request.id, request.params).await,
579 "graph.nodes" => handle_graph_nodes(db, request.id).await,
580 "graph.edges" => handle_graph_edges(db, request.id).await,
581 "graph.topo_order" => handle_graph_topo_order(db, request.id).await,
582 _ => {
583 #[cfg(feature = "tracing")]
584 tracing::warn!("Unknown method: {}", request.method);
585
586 Response::error(
587 request.id,
588 "method_not_found",
589 format!("Unknown method: {}", request.method),
590 )
591 }
592 }
593}
594
595#[cfg(feature = "std")]
607async fn handle_record_list<R>(
608 db: &Arc<AimDb<R>>,
609 _config: &AimxConfig,
610 request_id: u64,
611) -> Response
612where
613 R: crate::RuntimeAdapter + crate::Spawn + 'static,
614{
615 #[cfg(feature = "tracing")]
616 tracing::debug!("Listing records");
617
618 let records: Vec<RecordMetadata> = db.list_records();
620
621 #[cfg(feature = "tracing")]
622 tracing::debug!("Found {} records", records.len());
623
624 Response::success(request_id, json!(records))
626}
627
628#[cfg(feature = "std")]
645async fn handle_record_get<R>(
646 db: &Arc<AimDb<R>>,
647 _config: &AimxConfig,
648 request_id: u64,
649 params: Option<serde_json::Value>,
650) -> Response
651where
652 R: crate::RuntimeAdapter + crate::Spawn + 'static,
653{
654 let record_name = match params {
656 Some(serde_json::Value::Object(map)) => match map.get("record") {
657 Some(serde_json::Value::String(name)) => name.clone(),
658 _ => {
659 #[cfg(feature = "tracing")]
660 tracing::warn!("Missing or invalid 'record' parameter");
661
662 return Response::error(
663 request_id,
664 "invalid_params",
665 "Missing or invalid 'record' parameter".to_string(),
666 );
667 }
668 },
669 _ => {
670 #[cfg(feature = "tracing")]
671 tracing::warn!("Missing params object");
672
673 return Response::error(
674 request_id,
675 "invalid_params",
676 "Missing params object".to_string(),
677 );
678 }
679 };
680
681 #[cfg(feature = "tracing")]
682 tracing::debug!("Getting value for record: {}", record_name);
683
684 match db.try_latest_as_json(&record_name) {
686 Some(value) => {
687 #[cfg(feature = "tracing")]
688 tracing::debug!("Successfully retrieved value for {}", record_name);
689
690 Response::success(request_id, value)
691 }
692 None => {
693 #[cfg(feature = "tracing")]
694 tracing::warn!("No value available for record: {}", record_name);
695
696 Response::error(
697 request_id,
698 "not_found",
699 format!("No value available for record: {}", record_name),
700 )
701 }
702 }
703}
704
705#[cfg(feature = "std")]
726async fn handle_record_set<R>(
727 db: &Arc<AimDb<R>>,
728 config: &AimxConfig,
729 request_id: u64,
730 params: Option<serde_json::Value>,
731) -> Response
732where
733 R: crate::RuntimeAdapter + crate::Spawn + 'static,
734{
735 use crate::remote::SecurityPolicy;
736
737 let writable_records = match &config.security_policy {
739 SecurityPolicy::ReadOnly => {
740 #[cfg(feature = "tracing")]
741 tracing::warn!("record.set called but security policy is ReadOnly");
742
743 return Response::error(
744 request_id,
745 "permission_denied",
746 "Write operations not allowed (ReadOnly security policy)".to_string(),
747 );
748 }
749 SecurityPolicy::ReadWrite { writable_records } => writable_records,
750 };
751
752 let (record_name, value) = match params {
754 Some(serde_json::Value::Object(ref map)) => {
755 let name = match map.get("name") {
756 Some(serde_json::Value::String(n)) => n.clone(),
757 _ => {
758 #[cfg(feature = "tracing")]
759 tracing::warn!("Missing or invalid 'name' parameter in record.set");
760
761 return Response::error(
762 request_id,
763 "invalid_params",
764 "Missing or invalid 'name' parameter (expected string)".to_string(),
765 );
766 }
767 };
768
769 let val = match map.get("value") {
770 Some(v) => v.clone(),
771 None => {
772 #[cfg(feature = "tracing")]
773 tracing::warn!("Missing 'value' parameter in record.set");
774
775 return Response::error(
776 request_id,
777 "invalid_params",
778 "Missing 'value' parameter".to_string(),
779 );
780 }
781 };
782
783 (name, val)
784 }
785 _ => {
786 #[cfg(feature = "tracing")]
787 tracing::warn!("Missing params object in record.set");
788
789 return Response::error(
790 request_id,
791 "invalid_params",
792 "Missing params object".to_string(),
793 );
794 }
795 };
796
797 #[cfg(feature = "tracing")]
798 tracing::debug!("Setting value for record: {}", record_name);
799
800 if !writable_records.contains(&record_name) {
802 #[cfg(feature = "tracing")]
803 tracing::warn!("Record '{}' not in writable_records set", record_name);
804
805 return Response::error(
806 request_id,
807 "permission_denied",
808 format!(
809 "Record '{}' is not writable. \
810 Configure with .with_writable_record() to allow writes.",
811 record_name
812 ),
813 );
814 }
815
816 match db.set_record_from_json(&record_name, value) {
819 Ok(()) => {
820 #[cfg(feature = "tracing")]
821 tracing::info!("Successfully set value for record: {}", record_name);
822
823 let result = if let Some(updated_value) = db.try_latest_as_json(&record_name) {
825 serde_json::json!({
826 "status": "success",
827 "value": updated_value,
828 })
829 } else {
830 serde_json::json!({
831 "status": "success",
832 })
833 };
834
835 Response::success(request_id, result)
836 }
837 Err(e) => {
838 #[cfg(feature = "tracing")]
839 tracing::error!("Failed to set value for record '{}': {}", record_name, e);
840
841 let (code, message) = match e {
843 crate::DbError::RecordKeyNotFound { key } => {
844 ("not_found", format!("Record '{}' not found", key))
845 }
846 crate::DbError::PermissionDenied { operation } => {
847 ("permission_denied", operation)
849 }
850 crate::DbError::JsonWithContext { context, .. } => (
851 "validation_error",
852 format!("JSON validation failed: {}", context),
853 ),
854 crate::DbError::RuntimeError { message } => ("internal_error", message),
855 _ => ("internal_error", format!("Failed to set value: {}", e)),
856 };
857
858 Response::error(request_id, code, message)
859 }
860 }
861}
862
863#[cfg(feature = "std")]
880async fn handle_record_subscribe<R>(
881 db: &Arc<AimDb<R>>,
882 config: &AimxConfig,
883 conn_state: &mut ConnectionState,
884 request_id: u64,
885 params: Option<serde_json::Value>,
886) -> Response
887where
888 R: crate::RuntimeAdapter + crate::Spawn + 'static,
889{
890 let record_name = match params {
892 Some(serde_json::Value::Object(ref map)) => match map.get("name") {
893 Some(serde_json::Value::String(name)) => name.clone(),
894 _ => {
895 #[cfg(feature = "tracing")]
896 tracing::warn!("Missing or invalid 'name' parameter in record.subscribe");
897
898 return Response::error(
899 request_id,
900 "invalid_params",
901 "Missing or invalid 'name' parameter (expected string)".to_string(),
902 );
903 }
904 },
905 _ => {
906 #[cfg(feature = "tracing")]
907 tracing::warn!("Missing params object in record.subscribe");
908
909 return Response::error(
910 request_id,
911 "invalid_params",
912 "Missing params object".to_string(),
913 );
914 }
915 };
916
917 let _send_initial = params
919 .as_ref()
920 .and_then(|p| p.as_object())
921 .and_then(|map| map.get("send_initial"))
922 .and_then(|v| v.as_bool())
923 .unwrap_or(true);
924
925 #[cfg(feature = "tracing")]
926 tracing::debug!("Subscribing to record: {}", record_name);
927
928 if conn_state.subscriptions.len() >= config.subscription_queue_size {
930 #[cfg(feature = "tracing")]
931 tracing::warn!(
932 "Too many subscriptions: {} (max: {})",
933 conn_state.subscriptions.len(),
934 config.subscription_queue_size
935 );
936
937 return Response::error(
938 request_id,
939 "too_many_subscriptions",
940 format!(
941 "Maximum subscriptions reached: {}",
942 config.subscription_queue_size
943 ),
944 );
945 }
946
947 let subscription_id = conn_state.generate_subscription_id();
949
950 let (value_rx, cancel_tx) =
952 match db.subscribe_record_updates(&record_name, config.subscription_queue_size) {
953 Ok(channels) => channels,
954 Err(e) => {
955 let (code, message) = match &e {
957 crate::DbError::RecordKeyNotFound { key } => {
958 #[cfg(feature = "tracing")]
959 tracing::warn!("Record not found: {}", key);
960 ("not_found", format!("Record '{}' not found", key))
961 }
962 _ => {
963 #[cfg(feature = "tracing")]
964 tracing::error!("Failed to subscribe to record updates: {}", e);
965 ("internal_error", format!("Failed to subscribe: {}", e))
966 }
967 };
968
969 return Response::error(request_id, code, message);
970 }
971 };
972
973 let event_tx = conn_state.event_tx.clone();
975 let sub_id_clone = subscription_id.clone();
976 let stream_handle = tokio::spawn(async move {
977 stream_subscription_events(sub_id_clone, value_rx, event_tx).await;
978 });
979
980 let handle = SubscriptionHandle {
982 subscription_id: subscription_id.clone(),
983 record_name: record_name.clone(),
984 cancel_tx,
985 };
986 conn_state.add_subscription(handle);
987
988 std::mem::drop(stream_handle);
990
991 #[cfg(feature = "tracing")]
992 tracing::info!(
993 "Created subscription {} for record {}",
994 subscription_id,
995 record_name
996 );
997
998 Response::success(
1000 request_id,
1001 json!({
1002 "subscription_id": subscription_id,
1003 "queue_size": config.subscription_queue_size,
1004 }),
1005 )
1006}
1007
1008#[cfg(feature = "std")]
1018async fn stream_subscription_events(
1019 subscription_id: String,
1020 mut value_rx: tokio::sync::mpsc::Receiver<serde_json::Value>,
1021 event_tx: tokio::sync::mpsc::UnboundedSender<Event>,
1022) {
1023 let mut sequence: u64 = 1;
1024
1025 #[cfg(feature = "tracing")]
1026 tracing::debug!(
1027 "Event streaming task started for subscription: {}",
1028 subscription_id
1029 );
1030
1031 while let Some(json_value) = value_rx.recv().await {
1032 let duration = std::time::SystemTime::now()
1034 .duration_since(std::time::UNIX_EPOCH)
1035 .unwrap_or_default();
1036 let timestamp = format!("{}.{:09}", duration.as_secs(), duration.subsec_nanos());
1037
1038 let event = Event {
1040 subscription_id: subscription_id.clone(),
1041 sequence,
1042 data: json_value,
1043 timestamp,
1044 dropped: None, };
1046
1047 if event_tx.send(event).is_err() {
1049 #[cfg(feature = "tracing")]
1050 tracing::debug!(
1051 "Event channel closed, terminating stream for subscription: {}",
1052 subscription_id
1053 );
1054 break;
1055 }
1056
1057 sequence += 1;
1058 }
1059
1060 #[cfg(feature = "tracing")]
1061 tracing::debug!(
1062 "Event streaming task terminated for subscription: {}",
1063 subscription_id
1064 );
1065}
1066
1067#[cfg(feature = "std")]
1079async fn handle_record_unsubscribe(
1080 conn_state: &mut ConnectionState,
1081 request_id: u64,
1082 params: Option<serde_json::Value>,
1083) -> Response {
1084 let subscription_id = match params {
1086 Some(serde_json::Value::Object(ref map)) => match map.get("subscription_id") {
1087 Some(serde_json::Value::String(id)) => id.clone(),
1088 _ => {
1089 return Response::error(
1090 request_id,
1091 "invalid_params",
1092 "Missing or invalid 'subscription_id' parameter".to_string(),
1093 )
1094 }
1095 },
1096 _ => {
1097 return Response::error(
1098 request_id,
1099 "invalid_params",
1100 "Missing 'subscription_id' parameter".to_string(),
1101 )
1102 }
1103 };
1104
1105 #[cfg(feature = "tracing")]
1106 tracing::debug!("Unsubscribing from subscription_id: {}", subscription_id);
1107
1108 match conn_state.subscriptions.remove(&subscription_id) {
1110 Some(handle) => {
1111 let _ = handle.cancel_tx.send(());
1114
1115 #[cfg(feature = "tracing")]
1116 tracing::debug!(
1117 "Cancelled subscription {} for record {}",
1118 subscription_id,
1119 handle.record_name
1120 );
1121
1122 Response::success(
1123 request_id,
1124 serde_json::json!({
1125 "subscription_id": subscription_id,
1126 "status": "cancelled"
1127 }),
1128 )
1129 }
1130 None => {
1131 #[cfg(feature = "tracing")]
1132 tracing::warn!("Subscription not found: {}", subscription_id);
1133
1134 Response::error(
1135 request_id,
1136 "not_found",
1137 format!("Subscription '{}' not found", subscription_id),
1138 )
1139 }
1140 }
1141}
1142
1143#[cfg(feature = "std")]
1161async fn handle_record_drain<R>(
1162 db: &Arc<AimDb<R>>,
1163 conn_state: &mut ConnectionState,
1164 request_id: u64,
1165 params: Option<serde_json::Value>,
1166) -> Response
1167where
1168 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1169{
1170 let record_name = match params {
1172 Some(serde_json::Value::Object(ref map)) => match map.get("name") {
1173 Some(serde_json::Value::String(name)) => name.clone(),
1174 _ => {
1175 return Response::error(
1176 request_id,
1177 "invalid_params",
1178 "Missing or invalid 'name' parameter (expected string)".to_string(),
1179 );
1180 }
1181 },
1182 _ => {
1183 return Response::error(
1184 request_id,
1185 "invalid_params",
1186 "Missing params object".to_string(),
1187 );
1188 }
1189 };
1190
1191 let limit = params
1195 .as_ref()
1196 .and_then(|p| p.as_object())
1197 .and_then(|map| map.get("limit"))
1198 .and_then(|v| v.as_u64())
1199 .map(|v| usize::try_from(v).unwrap_or(usize::MAX))
1200 .unwrap_or(usize::MAX);
1201
1202 #[cfg(feature = "tracing")]
1203 tracing::debug!(
1204 "Draining record: {} (limit: {})",
1205 record_name,
1206 if limit == usize::MAX {
1207 "all".to_string()
1208 } else {
1209 limit.to_string()
1210 }
1211 );
1212
1213 if !conn_state.drain_readers.contains_key(&record_name) {
1215 let id = match db.inner().resolve_str(&record_name) {
1217 Some(id) => id,
1218 None => {
1219 return Response::error(
1220 request_id,
1221 "not_found",
1222 format!("Record '{}' not found", record_name),
1223 );
1224 }
1225 };
1226
1227 let record = match db.inner().storage(id) {
1228 Some(r) => r,
1229 None => {
1230 return Response::error(
1231 request_id,
1232 "not_found",
1233 format!("Record '{}' storage not found", record_name),
1234 );
1235 }
1236 };
1237
1238 let reader = match record.subscribe_json() {
1239 Ok(r) => r,
1240 Err(e) => {
1241 return Response::error(
1242 request_id,
1243 "remote_access_not_enabled",
1244 format!(
1245 "Record '{}' not configured with .with_remote_access(): {}",
1246 record_name, e
1247 ),
1248 );
1249 }
1250 };
1251
1252 conn_state.drain_readers.insert(record_name.clone(), reader);
1253 }
1254
1255 let reader = conn_state.drain_readers.get_mut(&record_name).unwrap();
1257 let mut values = Vec::new();
1258
1259 loop {
1260 if values.len() >= limit {
1261 break;
1262 }
1263 match reader.try_recv_json() {
1264 Ok(val) => values.push(val),
1265 Err(DbError::BufferEmpty) => break,
1266 Err(DbError::BufferLagged { .. }) => {
1267 #[cfg(feature = "tracing")]
1270 tracing::warn!(
1271 "Drain reader lagged for record '{}' — some values were lost",
1272 record_name
1273 );
1274 continue;
1275 }
1276 Err(_) => break,
1277 }
1278 }
1279
1280 let count = values.len();
1281
1282 #[cfg(feature = "tracing")]
1283 tracing::debug!("Drained {} values from record '{}'", count, record_name);
1284
1285 Response::success(
1286 request_id,
1287 json!({
1288 "record_name": record_name,
1289 "values": values,
1290 "count": count,
1291 }),
1292 )
1293}
1294
1295pub type QueryHandlerFn = Box<
1307 dyn Fn(
1308 QueryHandlerParams,
1309 ) -> core::pin::Pin<
1310 Box<dyn core::future::Future<Output = Result<serde_json::Value, String>> + Send>,
1311 > + Send
1312 + Sync,
1313>;
1314
1315#[derive(Debug, Clone)]
1317pub struct QueryHandlerParams {
1318 pub name: String,
1320 pub limit: Option<usize>,
1322 pub start: Option<u64>,
1324 pub end: Option<u64>,
1326}
1327
1328#[cfg(feature = "std")]
1334async fn handle_record_query<R>(
1335 db: &Arc<AimDb<R>>,
1336 request_id: u64,
1337 params: Option<serde_json::Value>,
1338) -> Response
1339where
1340 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1341{
1342 let handler = match db.extensions().get::<QueryHandlerFn>() {
1344 Some(h) => h,
1345 None => {
1346 return Response::error(
1347 request_id,
1348 "not_configured",
1349 "Persistence not configured. Call .with_persistence() on the builder.".to_string(),
1350 );
1351 }
1352 };
1353
1354 let (name, limit, start, end) = match ¶ms {
1356 Some(serde_json::Value::Object(map)) => {
1357 let name = map
1358 .get("name")
1359 .and_then(|v| v.as_str())
1360 .unwrap_or("*")
1361 .to_string();
1362 let limit = map
1363 .get("limit")
1364 .and_then(|v| v.as_u64())
1365 .and_then(|v| usize::try_from(v).ok());
1366 let start = map.get("start").and_then(|v| v.as_u64());
1367 let end = map.get("end").and_then(|v| v.as_u64());
1368 (name, limit, start, end)
1369 }
1370 _ => ("*".to_string(), None, None, None),
1371 };
1372
1373 let query_params = QueryHandlerParams {
1374 name,
1375 limit,
1376 start,
1377 end,
1378 };
1379
1380 match handler(query_params).await {
1381 Ok(result) => Response::success(request_id, result),
1382 Err(msg) => Response::error(request_id, "query_error", msg),
1383 }
1384}
1385
1386#[cfg(feature = "std")]
1408async fn handle_graph_nodes<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1409where
1410 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1411{
1412 #[cfg(feature = "tracing")]
1413 tracing::debug!("Getting dependency graph nodes");
1414
1415 let graph = db.inner().dependency_graph();
1416 let nodes = &graph.nodes;
1417
1418 #[cfg(feature = "tracing")]
1419 tracing::debug!("Returning {} graph nodes", nodes.len());
1420
1421 Response::success(request_id, json!(nodes))
1422}
1423
1424#[cfg(feature = "std")]
1439async fn handle_graph_edges<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1440where
1441 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1442{
1443 #[cfg(feature = "tracing")]
1444 tracing::debug!("Getting dependency graph edges");
1445
1446 let graph = db.inner().dependency_graph();
1447 let edges = &graph.edges;
1448
1449 #[cfg(feature = "tracing")]
1450 tracing::debug!("Returning {} graph edges", edges.len());
1451
1452 Response::success(request_id, json!(edges))
1453}
1454
1455#[cfg(feature = "std")]
1471async fn handle_graph_topo_order<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1472where
1473 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1474{
1475 #[cfg(feature = "tracing")]
1476 tracing::debug!("Getting topological order");
1477
1478 let graph = db.inner().dependency_graph();
1479 let topo_order = graph.topo_order();
1480
1481 #[cfg(feature = "tracing")]
1482 tracing::debug!(
1483 "Returning topological order with {} records",
1484 topo_order.len()
1485 );
1486
1487 Response::success(request_id, json!(topo_order))
1488}