1use crate::remote::{
15 AimxConfig, Event, HelloMessage, RecordMetadata, Request, Response, WelcomeMessage,
16};
17use crate::{AimDb, DbError, DbResult};
18
19#[cfg(feature = "std")]
20use std::collections::HashMap;
21#[cfg(feature = "std")]
22use std::sync::Arc;
23
24#[cfg(feature = "std")]
25use serde_json::json;
26#[cfg(feature = "std")]
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
28#[cfg(feature = "std")]
29use tokio::net::UnixStream;
30#[cfg(feature = "std")]
31use tokio::sync::mpsc;
32#[cfg(feature = "std")]
33use tokio::sync::oneshot;
34
35#[cfg(feature = "std")]
39#[allow(dead_code)] struct SubscriptionHandle {
41 subscription_id: String,
43
44 record_name: String,
46
47 cancel_tx: oneshot::Sender<()>,
50}
51
52#[cfg(feature = "std")]
56struct ConnectionState {
57 subscriptions: HashMap<String, SubscriptionHandle>,
59
60 next_subscription_id: u64,
62
63 event_tx: mpsc::UnboundedSender<Event>,
66
67 drain_readers: HashMap<String, Box<dyn crate::buffer::JsonBufferReader + Send>>,
70}
71
72#[cfg(feature = "std")]
73impl ConnectionState {
74 fn new(event_tx: mpsc::UnboundedSender<Event>) -> Self {
76 Self {
77 subscriptions: HashMap::new(),
78 next_subscription_id: 1,
79 event_tx,
80 drain_readers: HashMap::new(),
81 }
82 }
83
84 fn generate_subscription_id(&mut self) -> String {
86 let id = format!("sub-{}", self.next_subscription_id);
87 self.next_subscription_id += 1;
88 id
89 }
90
91 fn add_subscription(&mut self, handle: SubscriptionHandle) {
93 self.subscriptions
94 .insert(handle.subscription_id.clone(), handle);
95 }
96
97 #[allow(dead_code)]
99 fn remove_subscription(&mut self, subscription_id: &str) -> Option<SubscriptionHandle> {
100 self.subscriptions.remove(subscription_id)
101 }
102
103 async fn cancel_all_subscriptions(&mut self) {
108 #[cfg(feature = "tracing")]
109 tracing::info!(
110 "Canceling {} active subscriptions",
111 self.subscriptions.len()
112 );
113
114 for (_id, handle) in self.subscriptions.drain() {
115 let _ = handle.cancel_tx.send(());
117 }
118 }
119}
120
121#[cfg(feature = "std")]
154pub async fn handle_connection<R>(
155 db: Arc<AimDb<R>>,
156 config: AimxConfig,
157 stream: UnixStream,
158) -> DbResult<()>
159where
160 R: crate::RuntimeAdapter + crate::Spawn + 'static,
161{
162 #[cfg(feature = "tracing")]
163 tracing::info!("New remote access connection established");
164
165 let mut stream = match perform_handshake(stream, &config, &db).await {
167 Ok(stream) => stream,
168 Err(e) => {
169 #[cfg(feature = "tracing")]
170 tracing::warn!("Handshake failed: {}", e);
171 return Err(e);
172 }
173 };
174
175 #[cfg(feature = "tracing")]
176 tracing::info!("Handshake complete, client ready");
177
178 let (event_tx, mut event_rx) = mpsc::unbounded_channel::<Event>();
180
181 let mut conn_state = ConnectionState::new(event_tx);
183
184 loop {
186 let mut line = String::new();
187
188 tokio::select! {
189 read_result = stream.read_line(&mut line) => {
191 match read_result {
192 Ok(0) => {
193 #[cfg(feature = "tracing")]
195 tracing::info!("Client disconnected gracefully");
196 break;
197 }
198 Ok(_) => {
199 #[cfg(feature = "tracing")]
200 tracing::debug!("Received request: {}", line.trim());
201
202 let request: Request = match serde_json::from_str(line.trim()) {
204 Ok(req) => req,
205 Err(e) => {
206 #[cfg(feature = "tracing")]
207 tracing::warn!("Failed to parse request: {}", e);
208
209 let error_response =
211 Response::error(0, "parse_error", format!("Invalid JSON: {}", e));
212 if let Err(_e) = send_response(&mut stream, &error_response).await {
213 #[cfg(feature = "tracing")]
214 tracing::error!("Failed to send error response: {}", _e);
215 break;
216 }
217 continue;
218 }
219 };
220
221 let response = handle_request(&db, &config, &mut conn_state, request).await;
223
224 if let Err(_e) = send_response(&mut stream, &response).await {
226 #[cfg(feature = "tracing")]
227 tracing::error!("Failed to send response: {}", _e);
228 break;
229 }
230 }
231 Err(_e) => {
232 #[cfg(feature = "tracing")]
233 tracing::error!("Error reading from stream: {}", _e);
234 break;
235 }
236 }
237 }
238
239 Some(event) = event_rx.recv() => {
241 if let Err(_e) = send_event(&mut stream, &event).await {
242 #[cfg(feature = "tracing")]
243 tracing::error!("Failed to send event: {}", _e);
244 break;
245 }
246 }
247 }
248 }
249
250 conn_state.cancel_all_subscriptions().await;
252
253 #[cfg(feature = "tracing")]
254 tracing::info!("Connection handler terminating");
255
256 Ok(())
257}
258
259#[cfg(feature = "std")]
270async fn send_event(stream: &mut BufReader<UnixStream>, event: &Event) -> DbResult<()> {
271 let event_msg = json!({ "event": event });
273
274 let event_json = serde_json::to_string(&event_msg).map_err(|e| DbError::JsonWithContext {
275 context: "Failed to serialize event".to_string(),
276 source: e,
277 })?;
278
279 stream
280 .get_mut()
281 .write_all(event_json.as_bytes())
282 .await
283 .map_err(|e| DbError::IoWithContext {
284 context: "Failed to write event".to_string(),
285 source: e,
286 })?;
287
288 stream
289 .get_mut()
290 .write_all(b"\n")
291 .await
292 .map_err(|e| DbError::IoWithContext {
293 context: "Failed to write event newline".to_string(),
294 source: e,
295 })?;
296
297 #[cfg(feature = "tracing")]
298 tracing::trace!("Sent event for subscription: {}", event.subscription_id);
299
300 Ok(())
301}
302
303#[cfg(feature = "std")]
314async fn send_response(stream: &mut BufReader<UnixStream>, response: &Response) -> DbResult<()> {
315 let response_json = serde_json::to_string(response).map_err(|e| DbError::JsonWithContext {
316 context: "Failed to serialize response".to_string(),
317 source: e,
318 })?;
319
320 stream
321 .get_mut()
322 .write_all(response_json.as_bytes())
323 .await
324 .map_err(|e| DbError::IoWithContext {
325 context: "Failed to write response".to_string(),
326 source: e,
327 })?;
328
329 stream
330 .get_mut()
331 .write_all(b"\n")
332 .await
333 .map_err(|e| DbError::IoWithContext {
334 context: "Failed to write response newline".to_string(),
335 source: e,
336 })?;
337
338 #[cfg(feature = "tracing")]
339 tracing::debug!("Sent response");
340
341 Ok(())
342}
343
344#[cfg(feature = "std")]
366async fn perform_handshake<R>(
367 stream: UnixStream,
368 config: &AimxConfig,
369 db: &Arc<AimDb<R>>,
370) -> DbResult<BufReader<UnixStream>>
371where
372 R: crate::RuntimeAdapter + crate::Spawn + 'static,
373{
374 let (reader, mut writer) = stream.into_split();
375 let mut reader = BufReader::new(reader);
376
377 let mut line = String::new();
379 reader
380 .read_line(&mut line)
381 .await
382 .map_err(|e| DbError::IoWithContext {
383 context: "Failed to read Hello message".to_string(),
384 source: e,
385 })?;
386
387 #[cfg(feature = "tracing")]
388 tracing::debug!("Received handshake: {}", line.trim());
389
390 let hello: HelloMessage =
392 serde_json::from_str(line.trim()).map_err(|e| DbError::JsonWithContext {
393 context: "Failed to parse Hello message".to_string(),
394 source: e,
395 })?;
396
397 #[cfg(feature = "tracing")]
398 tracing::debug!(
399 "Client hello: version={}, client={}",
400 hello.version,
401 hello.client
402 );
403
404 if hello.version != "1.0" && hello.version != "1" {
406 let error_msg = format!(
407 r#"{{"error":"unsupported_version","message":"Server supports version 1.0, client requested {}"}}"#,
408 hello.version
409 );
410
411 #[cfg(feature = "tracing")]
412 tracing::warn!("Unsupported version: {}", hello.version);
413
414 let _ = writer.write_all(error_msg.as_bytes()).await;
415 let _ = writer.write_all(b"\n").await;
416 let _ = writer.shutdown().await;
417
418 return Err(DbError::InvalidOperation {
419 operation: "handshake".to_string(),
420 reason: format!("Unsupported version: {}", hello.version),
421 });
422 }
423
424 let authenticated = if let Some(expected_token) = &config.auth_token {
426 match &hello.auth_token {
427 Some(provided_token) if provided_token == expected_token => {
428 #[cfg(feature = "tracing")]
429 tracing::debug!("Authentication successful");
430 true
431 }
432 Some(_) => {
433 let error_msg =
434 r#"{"error":"authentication_failed","message":"Invalid auth token"}"#;
435
436 #[cfg(feature = "tracing")]
437 tracing::warn!("Authentication failed: invalid token");
438
439 let _ = writer.write_all(error_msg.as_bytes()).await;
440 let _ = writer.write_all(b"\n").await;
441 let _ = writer.shutdown().await;
442
443 return Err(DbError::PermissionDenied {
444 operation: "authentication".to_string(),
445 });
446 }
447 None => {
448 let error_msg =
449 r#"{"error":"authentication_required","message":"Auth token required"}"#;
450
451 #[cfg(feature = "tracing")]
452 tracing::warn!("Authentication failed: no token provided");
453
454 let _ = writer.write_all(error_msg.as_bytes()).await;
455 let _ = writer.write_all(b"\n").await;
456 let _ = writer.shutdown().await;
457
458 return Err(DbError::PermissionDenied {
459 operation: "authentication".to_string(),
460 });
461 }
462 }
463 } else {
464 false
465 };
466
467 let permissions = match &config.security_policy {
469 crate::remote::SecurityPolicy::ReadOnly => vec!["read".to_string()],
470 crate::remote::SecurityPolicy::ReadWrite { .. } => {
471 vec!["read".to_string(), "write".to_string()]
472 }
473 };
474
475 let writable_records = match &config.security_policy {
477 crate::remote::SecurityPolicy::ReadOnly => vec![],
478 crate::remote::SecurityPolicy::ReadWrite {
479 writable_records: _writable_type_ids,
480 } => {
481 let all_records: Vec<RecordMetadata> = db.list_records();
483
484 all_records
486 .into_iter()
487 .filter(|meta| meta.writable)
488 .map(|meta| meta.name)
489 .collect()
490 }
491 };
492
493 let welcome = WelcomeMessage {
495 version: "1.0".to_string(),
496 server: "aimdb".to_string(),
497 permissions,
498 writable_records,
499 max_subscriptions: Some(config.subscription_queue_size),
500 authenticated: Some(authenticated),
501 };
502
503 let welcome_json = serde_json::to_string(&welcome).map_err(|e| DbError::JsonWithContext {
504 context: "Failed to serialize Welcome message".to_string(),
505 source: e,
506 })?;
507
508 writer
509 .write_all(welcome_json.as_bytes())
510 .await
511 .map_err(|e| DbError::IoWithContext {
512 context: "Failed to write Welcome message".to_string(),
513 source: e,
514 })?;
515
516 writer
517 .write_all(b"\n")
518 .await
519 .map_err(|e| DbError::IoWithContext {
520 context: "Failed to write Welcome newline".to_string(),
521 source: e,
522 })?;
523
524 #[cfg(feature = "tracing")]
525 tracing::info!("Sent Welcome message to client");
526
527 let stream = reader
529 .into_inner()
530 .reunite(writer)
531 .map_err(|e| DbError::Io {
532 source: std::io::Error::other(e.to_string()),
533 })?;
534
535 Ok(BufReader::new(stream))
536}
537
538#[cfg(feature = "std")]
551async fn handle_request<R>(
552 db: &Arc<AimDb<R>>,
553 config: &AimxConfig,
554 conn_state: &mut ConnectionState,
555 request: Request,
556) -> Response
557where
558 R: crate::RuntimeAdapter + crate::Spawn + 'static,
559{
560 #[cfg(feature = "tracing")]
561 tracing::debug!(
562 "Handling request: method={}, id={}",
563 request.method,
564 request.id
565 );
566
567 match request.method.as_str() {
568 "record.list" => handle_record_list(db, config, request.id).await,
569 "record.get" => handle_record_get(db, config, request.id, request.params).await,
570 "record.set" => handle_record_set(db, config, request.id, request.params).await,
571 "record.subscribe" => {
572 handle_record_subscribe(db, config, conn_state, request.id, request.params).await
573 }
574 "record.unsubscribe" => {
575 handle_record_unsubscribe(conn_state, request.id, request.params).await
576 }
577 "record.drain" => handle_record_drain(db, conn_state, request.id, request.params).await,
578 "record.query" => handle_record_query(db, request.id, request.params).await,
579 "graph.nodes" => handle_graph_nodes(db, request.id).await,
580 "graph.edges" => handle_graph_edges(db, request.id).await,
581 "graph.topo_order" => handle_graph_topo_order(db, request.id).await,
582 #[cfg(feature = "profiling")]
583 "profiling.reset" => handle_profiling_reset(db, config, request.id).await,
584 #[cfg(feature = "metrics")]
585 "buffer_metrics.reset" => handle_buffer_metrics_reset(db, config, request.id).await,
586 _ => {
587 #[cfg(feature = "tracing")]
588 tracing::warn!("Unknown method: {}", request.method);
589
590 Response::error(
591 request.id,
592 "method_not_found",
593 format!("Unknown method: {}", request.method),
594 )
595 }
596 }
597}
598
599#[cfg(feature = "std")]
611async fn handle_record_list<R>(
612 db: &Arc<AimDb<R>>,
613 _config: &AimxConfig,
614 request_id: u64,
615) -> Response
616where
617 R: crate::RuntimeAdapter + crate::Spawn + 'static,
618{
619 #[cfg(feature = "tracing")]
620 tracing::debug!("Listing records");
621
622 let records: Vec<RecordMetadata> = db.list_records();
624
625 #[cfg(feature = "tracing")]
626 tracing::debug!("Found {} records", records.len());
627
628 Response::success(request_id, json!(records))
630}
631
632#[cfg(all(feature = "std", feature = "profiling"))]
636async fn handle_profiling_reset<R>(
637 db: &Arc<AimDb<R>>,
638 config: &AimxConfig,
639 request_id: u64,
640) -> Response
641where
642 R: crate::RuntimeAdapter + crate::Spawn + 'static,
643{
644 if matches!(
645 config.security_policy,
646 crate::remote::SecurityPolicy::ReadOnly
647 ) {
648 return Response::error(
649 request_id,
650 "permission_denied",
651 "profiling.reset requires write permission (ReadOnly security policy)".to_string(),
652 );
653 }
654
655 db.reset_stage_profiling();
656
657 #[cfg(feature = "tracing")]
658 tracing::info!("Stage profiling counters reset");
659
660 Response::success(request_id, json!({ "reset": true }))
661}
662
663#[cfg(all(feature = "std", feature = "metrics"))]
667async fn handle_buffer_metrics_reset<R>(
668 db: &Arc<AimDb<R>>,
669 config: &AimxConfig,
670 request_id: u64,
671) -> Response
672where
673 R: crate::RuntimeAdapter + crate::Spawn + 'static,
674{
675 if matches!(
676 config.security_policy,
677 crate::remote::SecurityPolicy::ReadOnly
678 ) {
679 return Response::error(
680 request_id,
681 "permission_denied",
682 "buffer_metrics.reset requires write permission (ReadOnly security policy)".to_string(),
683 );
684 }
685
686 db.reset_buffer_metrics();
687
688 #[cfg(feature = "tracing")]
689 tracing::info!("Buffer metrics counters reset");
690
691 Response::success(request_id, json!({ "reset": true }))
692}
693
694#[cfg(feature = "std")]
711async fn handle_record_get<R>(
712 db: &Arc<AimDb<R>>,
713 _config: &AimxConfig,
714 request_id: u64,
715 params: Option<serde_json::Value>,
716) -> Response
717where
718 R: crate::RuntimeAdapter + crate::Spawn + 'static,
719{
720 let record_name = match params {
722 Some(serde_json::Value::Object(map)) => match map.get("record") {
723 Some(serde_json::Value::String(name)) => name.clone(),
724 _ => {
725 #[cfg(feature = "tracing")]
726 tracing::warn!("Missing or invalid 'record' parameter");
727
728 return Response::error(
729 request_id,
730 "invalid_params",
731 "Missing or invalid 'record' parameter".to_string(),
732 );
733 }
734 },
735 _ => {
736 #[cfg(feature = "tracing")]
737 tracing::warn!("Missing params object");
738
739 return Response::error(
740 request_id,
741 "invalid_params",
742 "Missing params object".to_string(),
743 );
744 }
745 };
746
747 #[cfg(feature = "tracing")]
748 tracing::debug!("Getting value for record: {}", record_name);
749
750 match db.try_latest_as_json(&record_name) {
752 Some(value) => {
753 #[cfg(feature = "tracing")]
754 tracing::debug!("Successfully retrieved value for {}", record_name);
755
756 Response::success(request_id, value)
757 }
758 None => {
759 #[cfg(feature = "tracing")]
760 tracing::warn!("No value available for record: {}", record_name);
761
762 Response::error(
763 request_id,
764 "not_found",
765 format!("No value available for record: {}", record_name),
766 )
767 }
768 }
769}
770
771#[cfg(feature = "std")]
792async fn handle_record_set<R>(
793 db: &Arc<AimDb<R>>,
794 config: &AimxConfig,
795 request_id: u64,
796 params: Option<serde_json::Value>,
797) -> Response
798where
799 R: crate::RuntimeAdapter + crate::Spawn + 'static,
800{
801 use crate::remote::SecurityPolicy;
802
803 let writable_records = match &config.security_policy {
805 SecurityPolicy::ReadOnly => {
806 #[cfg(feature = "tracing")]
807 tracing::warn!("record.set called but security policy is ReadOnly");
808
809 return Response::error(
810 request_id,
811 "permission_denied",
812 "Write operations not allowed (ReadOnly security policy)".to_string(),
813 );
814 }
815 SecurityPolicy::ReadWrite { writable_records } => writable_records,
816 };
817
818 let (record_name, value) = match params {
820 Some(serde_json::Value::Object(ref map)) => {
821 let name = match map.get("name") {
822 Some(serde_json::Value::String(n)) => n.clone(),
823 _ => {
824 #[cfg(feature = "tracing")]
825 tracing::warn!("Missing or invalid 'name' parameter in record.set");
826
827 return Response::error(
828 request_id,
829 "invalid_params",
830 "Missing or invalid 'name' parameter (expected string)".to_string(),
831 );
832 }
833 };
834
835 let val = match map.get("value") {
836 Some(v) => v.clone(),
837 None => {
838 #[cfg(feature = "tracing")]
839 tracing::warn!("Missing 'value' parameter in record.set");
840
841 return Response::error(
842 request_id,
843 "invalid_params",
844 "Missing 'value' parameter".to_string(),
845 );
846 }
847 };
848
849 (name, val)
850 }
851 _ => {
852 #[cfg(feature = "tracing")]
853 tracing::warn!("Missing params object in record.set");
854
855 return Response::error(
856 request_id,
857 "invalid_params",
858 "Missing params object".to_string(),
859 );
860 }
861 };
862
863 #[cfg(feature = "tracing")]
864 tracing::debug!("Setting value for record: {}", record_name);
865
866 if !writable_records.contains(&record_name) {
868 #[cfg(feature = "tracing")]
869 tracing::warn!("Record '{}' not in writable_records set", record_name);
870
871 return Response::error(
872 request_id,
873 "permission_denied",
874 format!(
875 "Record '{}' is not writable. \
876 Configure with .with_writable_record() to allow writes.",
877 record_name
878 ),
879 );
880 }
881
882 match db.set_record_from_json(&record_name, value) {
885 Ok(()) => {
886 #[cfg(feature = "tracing")]
887 tracing::info!("Successfully set value for record: {}", record_name);
888
889 let result = if let Some(updated_value) = db.try_latest_as_json(&record_name) {
891 serde_json::json!({
892 "status": "success",
893 "value": updated_value,
894 })
895 } else {
896 serde_json::json!({
897 "status": "success",
898 })
899 };
900
901 Response::success(request_id, result)
902 }
903 Err(e) => {
904 #[cfg(feature = "tracing")]
905 tracing::error!("Failed to set value for record '{}': {}", record_name, e);
906
907 let (code, message) = match e {
909 crate::DbError::RecordKeyNotFound { key } => {
910 ("not_found", format!("Record '{}' not found", key))
911 }
912 crate::DbError::PermissionDenied { operation } => {
913 ("permission_denied", operation)
915 }
916 crate::DbError::JsonWithContext { context, .. } => (
917 "validation_error",
918 format!("JSON validation failed: {}", context),
919 ),
920 crate::DbError::RuntimeError { message } => ("internal_error", message),
921 _ => ("internal_error", format!("Failed to set value: {}", e)),
922 };
923
924 Response::error(request_id, code, message)
925 }
926 }
927}
928
929#[cfg(feature = "std")]
946async fn handle_record_subscribe<R>(
947 db: &Arc<AimDb<R>>,
948 config: &AimxConfig,
949 conn_state: &mut ConnectionState,
950 request_id: u64,
951 params: Option<serde_json::Value>,
952) -> Response
953where
954 R: crate::RuntimeAdapter + crate::Spawn + 'static,
955{
956 let record_name = match params {
958 Some(serde_json::Value::Object(ref map)) => match map.get("name") {
959 Some(serde_json::Value::String(name)) => name.clone(),
960 _ => {
961 #[cfg(feature = "tracing")]
962 tracing::warn!("Missing or invalid 'name' parameter in record.subscribe");
963
964 return Response::error(
965 request_id,
966 "invalid_params",
967 "Missing or invalid 'name' parameter (expected string)".to_string(),
968 );
969 }
970 },
971 _ => {
972 #[cfg(feature = "tracing")]
973 tracing::warn!("Missing params object in record.subscribe");
974
975 return Response::error(
976 request_id,
977 "invalid_params",
978 "Missing params object".to_string(),
979 );
980 }
981 };
982
983 let _send_initial = params
985 .as_ref()
986 .and_then(|p| p.as_object())
987 .and_then(|map| map.get("send_initial"))
988 .and_then(|v| v.as_bool())
989 .unwrap_or(true);
990
991 #[cfg(feature = "tracing")]
992 tracing::debug!("Subscribing to record: {}", record_name);
993
994 if conn_state.subscriptions.len() >= config.subscription_queue_size {
996 #[cfg(feature = "tracing")]
997 tracing::warn!(
998 "Too many subscriptions: {} (max: {})",
999 conn_state.subscriptions.len(),
1000 config.subscription_queue_size
1001 );
1002
1003 return Response::error(
1004 request_id,
1005 "too_many_subscriptions",
1006 format!(
1007 "Maximum subscriptions reached: {}",
1008 config.subscription_queue_size
1009 ),
1010 );
1011 }
1012
1013 let subscription_id = conn_state.generate_subscription_id();
1015
1016 let (value_rx, cancel_tx) =
1018 match db.subscribe_record_updates(&record_name, config.subscription_queue_size) {
1019 Ok(channels) => channels,
1020 Err(e) => {
1021 let (code, message) = match &e {
1023 crate::DbError::RecordKeyNotFound { key } => {
1024 #[cfg(feature = "tracing")]
1025 tracing::warn!("Record not found: {}", key);
1026 ("not_found", format!("Record '{}' not found", key))
1027 }
1028 _ => {
1029 #[cfg(feature = "tracing")]
1030 tracing::error!("Failed to subscribe to record updates: {}", e);
1031 ("internal_error", format!("Failed to subscribe: {}", e))
1032 }
1033 };
1034
1035 return Response::error(request_id, code, message);
1036 }
1037 };
1038
1039 let event_tx = conn_state.event_tx.clone();
1041 let sub_id_clone = subscription_id.clone();
1042 let stream_handle = tokio::spawn(async move {
1043 stream_subscription_events(sub_id_clone, value_rx, event_tx).await;
1044 });
1045
1046 let handle = SubscriptionHandle {
1048 subscription_id: subscription_id.clone(),
1049 record_name: record_name.clone(),
1050 cancel_tx,
1051 };
1052 conn_state.add_subscription(handle);
1053
1054 std::mem::drop(stream_handle);
1056
1057 #[cfg(feature = "tracing")]
1058 tracing::info!(
1059 "Created subscription {} for record {}",
1060 subscription_id,
1061 record_name
1062 );
1063
1064 Response::success(
1066 request_id,
1067 json!({
1068 "subscription_id": subscription_id,
1069 "queue_size": config.subscription_queue_size,
1070 }),
1071 )
1072}
1073
1074#[cfg(feature = "std")]
1084async fn stream_subscription_events(
1085 subscription_id: String,
1086 mut value_rx: tokio::sync::mpsc::Receiver<serde_json::Value>,
1087 event_tx: tokio::sync::mpsc::UnboundedSender<Event>,
1088) {
1089 let mut sequence: u64 = 1;
1090
1091 #[cfg(feature = "tracing")]
1092 tracing::debug!(
1093 "Event streaming task started for subscription: {}",
1094 subscription_id
1095 );
1096
1097 while let Some(json_value) = value_rx.recv().await {
1098 let duration = std::time::SystemTime::now()
1100 .duration_since(std::time::UNIX_EPOCH)
1101 .unwrap_or_default();
1102 let timestamp = format!("{}.{:09}", duration.as_secs(), duration.subsec_nanos());
1103
1104 let event = Event {
1106 subscription_id: subscription_id.clone(),
1107 sequence,
1108 data: json_value,
1109 timestamp,
1110 dropped: None, };
1112
1113 if event_tx.send(event).is_err() {
1115 #[cfg(feature = "tracing")]
1116 tracing::debug!(
1117 "Event channel closed, terminating stream for subscription: {}",
1118 subscription_id
1119 );
1120 break;
1121 }
1122
1123 sequence += 1;
1124 }
1125
1126 #[cfg(feature = "tracing")]
1127 tracing::debug!(
1128 "Event streaming task terminated for subscription: {}",
1129 subscription_id
1130 );
1131}
1132
1133#[cfg(feature = "std")]
1145async fn handle_record_unsubscribe(
1146 conn_state: &mut ConnectionState,
1147 request_id: u64,
1148 params: Option<serde_json::Value>,
1149) -> Response {
1150 let subscription_id = match params {
1152 Some(serde_json::Value::Object(ref map)) => match map.get("subscription_id") {
1153 Some(serde_json::Value::String(id)) => id.clone(),
1154 _ => {
1155 return Response::error(
1156 request_id,
1157 "invalid_params",
1158 "Missing or invalid 'subscription_id' parameter".to_string(),
1159 )
1160 }
1161 },
1162 _ => {
1163 return Response::error(
1164 request_id,
1165 "invalid_params",
1166 "Missing 'subscription_id' parameter".to_string(),
1167 )
1168 }
1169 };
1170
1171 #[cfg(feature = "tracing")]
1172 tracing::debug!("Unsubscribing from subscription_id: {}", subscription_id);
1173
1174 match conn_state.subscriptions.remove(&subscription_id) {
1176 Some(handle) => {
1177 let _ = handle.cancel_tx.send(());
1180
1181 #[cfg(feature = "tracing")]
1182 tracing::debug!(
1183 "Cancelled subscription {} for record {}",
1184 subscription_id,
1185 handle.record_name
1186 );
1187
1188 Response::success(
1189 request_id,
1190 serde_json::json!({
1191 "subscription_id": subscription_id,
1192 "status": "cancelled"
1193 }),
1194 )
1195 }
1196 None => {
1197 #[cfg(feature = "tracing")]
1198 tracing::warn!("Subscription not found: {}", subscription_id);
1199
1200 Response::error(
1201 request_id,
1202 "not_found",
1203 format!("Subscription '{}' not found", subscription_id),
1204 )
1205 }
1206 }
1207}
1208
1209#[cfg(feature = "std")]
1227async fn handle_record_drain<R>(
1228 db: &Arc<AimDb<R>>,
1229 conn_state: &mut ConnectionState,
1230 request_id: u64,
1231 params: Option<serde_json::Value>,
1232) -> Response
1233where
1234 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1235{
1236 let record_name = match params {
1238 Some(serde_json::Value::Object(ref map)) => match map.get("name") {
1239 Some(serde_json::Value::String(name)) => name.clone(),
1240 _ => {
1241 return Response::error(
1242 request_id,
1243 "invalid_params",
1244 "Missing or invalid 'name' parameter (expected string)".to_string(),
1245 );
1246 }
1247 },
1248 _ => {
1249 return Response::error(
1250 request_id,
1251 "invalid_params",
1252 "Missing params object".to_string(),
1253 );
1254 }
1255 };
1256
1257 let limit = params
1261 .as_ref()
1262 .and_then(|p| p.as_object())
1263 .and_then(|map| map.get("limit"))
1264 .and_then(|v| v.as_u64())
1265 .map(|v| usize::try_from(v).unwrap_or(usize::MAX))
1266 .unwrap_or(usize::MAX);
1267
1268 #[cfg(feature = "tracing")]
1269 tracing::debug!(
1270 "Draining record: {} (limit: {})",
1271 record_name,
1272 if limit == usize::MAX {
1273 "all".to_string()
1274 } else {
1275 limit.to_string()
1276 }
1277 );
1278
1279 if !conn_state.drain_readers.contains_key(&record_name) {
1281 let id = match db.inner().resolve_str(&record_name) {
1283 Some(id) => id,
1284 None => {
1285 return Response::error(
1286 request_id,
1287 "not_found",
1288 format!("Record '{}' not found", record_name),
1289 );
1290 }
1291 };
1292
1293 let record = match db.inner().storage(id) {
1294 Some(r) => r,
1295 None => {
1296 return Response::error(
1297 request_id,
1298 "not_found",
1299 format!("Record '{}' storage not found", record_name),
1300 );
1301 }
1302 };
1303
1304 let reader = match record.subscribe_json() {
1305 Ok(r) => r,
1306 Err(e) => {
1307 return Response::error(
1308 request_id,
1309 "remote_access_not_enabled",
1310 format!(
1311 "Record '{}' not configured with .with_remote_access(): {}",
1312 record_name, e
1313 ),
1314 );
1315 }
1316 };
1317
1318 conn_state.drain_readers.insert(record_name.clone(), reader);
1319 }
1320
1321 let reader = conn_state.drain_readers.get_mut(&record_name).unwrap();
1323 let mut values = Vec::new();
1324
1325 loop {
1326 if values.len() >= limit {
1327 break;
1328 }
1329 match reader.try_recv_json() {
1330 Ok(val) => values.push(val),
1331 Err(DbError::BufferEmpty) => break,
1332 Err(DbError::BufferLagged { .. }) => {
1333 #[cfg(feature = "tracing")]
1336 tracing::warn!(
1337 "Drain reader lagged for record '{}' — some values were lost",
1338 record_name
1339 );
1340 continue;
1341 }
1342 Err(_) => break,
1343 }
1344 }
1345
1346 let count = values.len();
1347
1348 #[cfg(feature = "tracing")]
1349 tracing::debug!("Drained {} values from record '{}'", count, record_name);
1350
1351 Response::success(
1352 request_id,
1353 json!({
1354 "record_name": record_name,
1355 "values": values,
1356 "count": count,
1357 }),
1358 )
1359}
1360
1361pub type QueryHandlerFn = Box<
1373 dyn Fn(
1374 QueryHandlerParams,
1375 ) -> core::pin::Pin<
1376 Box<dyn core::future::Future<Output = Result<serde_json::Value, String>> + Send>,
1377 > + Send
1378 + Sync,
1379>;
1380
1381#[derive(Debug, Clone)]
1383pub struct QueryHandlerParams {
1384 pub name: String,
1386 pub limit: Option<usize>,
1388 pub start: Option<u64>,
1390 pub end: Option<u64>,
1392}
1393
1394#[cfg(feature = "std")]
1400async fn handle_record_query<R>(
1401 db: &Arc<AimDb<R>>,
1402 request_id: u64,
1403 params: Option<serde_json::Value>,
1404) -> Response
1405where
1406 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1407{
1408 let handler = match db.extensions().get::<QueryHandlerFn>() {
1410 Some(h) => h,
1411 None => {
1412 return Response::error(
1413 request_id,
1414 "not_configured",
1415 "Persistence not configured. Call .with_persistence() on the builder.".to_string(),
1416 );
1417 }
1418 };
1419
1420 let (name, limit, start, end) = match ¶ms {
1422 Some(serde_json::Value::Object(map)) => {
1423 let name = map
1424 .get("name")
1425 .and_then(|v| v.as_str())
1426 .unwrap_or("*")
1427 .to_string();
1428 let limit = map
1429 .get("limit")
1430 .and_then(|v| v.as_u64())
1431 .and_then(|v| usize::try_from(v).ok());
1432 let start = map.get("start").and_then(|v| v.as_u64());
1433 let end = map.get("end").and_then(|v| v.as_u64());
1434 (name, limit, start, end)
1435 }
1436 _ => ("*".to_string(), None, None, None),
1437 };
1438
1439 let query_params = QueryHandlerParams {
1440 name,
1441 limit,
1442 start,
1443 end,
1444 };
1445
1446 match handler(query_params).await {
1447 Ok(result) => Response::success(request_id, result),
1448 Err(msg) => Response::error(request_id, "query_error", msg),
1449 }
1450}
1451
1452#[cfg(feature = "std")]
1474async fn handle_graph_nodes<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1475where
1476 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1477{
1478 #[cfg(feature = "tracing")]
1479 tracing::debug!("Getting dependency graph nodes");
1480
1481 let graph = db.inner().dependency_graph();
1482 let nodes = &graph.nodes;
1483
1484 #[cfg(feature = "tracing")]
1485 tracing::debug!("Returning {} graph nodes", nodes.len());
1486
1487 Response::success(request_id, json!(nodes))
1488}
1489
1490#[cfg(feature = "std")]
1505async fn handle_graph_edges<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1506where
1507 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1508{
1509 #[cfg(feature = "tracing")]
1510 tracing::debug!("Getting dependency graph edges");
1511
1512 let graph = db.inner().dependency_graph();
1513 let edges = &graph.edges;
1514
1515 #[cfg(feature = "tracing")]
1516 tracing::debug!("Returning {} graph edges", edges.len());
1517
1518 Response::success(request_id, json!(edges))
1519}
1520
1521#[cfg(feature = "std")]
1537async fn handle_graph_topo_order<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
1538where
1539 R: crate::RuntimeAdapter + crate::Spawn + 'static,
1540{
1541 #[cfg(feature = "tracing")]
1542 tracing::debug!("Getting topological order");
1543
1544 let graph = db.inner().dependency_graph();
1545 let topo_order = graph.topo_order();
1546
1547 #[cfg(feature = "tracing")]
1548 tracing::debug!(
1549 "Returning topological order with {} records",
1550 topo_order.len()
1551 );
1552
1553 Response::success(request_id, json!(topo_order))
1554}