use crate::remote::{
AimxConfig, Event, HelloMessage, RecordMetadata, Request, Response, WelcomeMessage,
};
use crate::{AimDb, DbError, DbResult};
#[cfg(feature = "std")]
use std::collections::HashMap;
#[cfg(feature = "std")]
use std::sync::Arc;
#[cfg(feature = "std")]
use serde_json::json;
#[cfg(feature = "std")]
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
#[cfg(feature = "std")]
use tokio::net::UnixStream;
#[cfg(feature = "std")]
use tokio::sync::mpsc;
#[cfg(feature = "std")]
use tokio::sync::oneshot;
#[cfg(feature = "std")]
#[allow(dead_code)] struct SubscriptionHandle {
subscription_id: String,
record_name: String,
cancel_tx: oneshot::Sender<()>,
}
#[cfg(feature = "std")]
struct ConnectionState {
subscriptions: HashMap<String, SubscriptionHandle>,
next_subscription_id: u64,
event_tx: mpsc::UnboundedSender<Event>,
drain_readers: HashMap<String, Box<dyn crate::buffer::JsonBufferReader + Send>>,
}
#[cfg(feature = "std")]
impl ConnectionState {
fn new(event_tx: mpsc::UnboundedSender<Event>) -> Self {
Self {
subscriptions: HashMap::new(),
next_subscription_id: 1,
event_tx,
drain_readers: HashMap::new(),
}
}
fn generate_subscription_id(&mut self) -> String {
let id = format!("sub-{}", self.next_subscription_id);
self.next_subscription_id += 1;
id
}
fn add_subscription(&mut self, handle: SubscriptionHandle) {
self.subscriptions
.insert(handle.subscription_id.clone(), handle);
}
#[allow(dead_code)]
fn remove_subscription(&mut self, subscription_id: &str) -> Option<SubscriptionHandle> {
self.subscriptions.remove(subscription_id)
}
async fn cancel_all_subscriptions(&mut self) {
#[cfg(feature = "tracing")]
tracing::info!(
"Canceling {} active subscriptions",
self.subscriptions.len()
);
for (_id, handle) in self.subscriptions.drain() {
let _ = handle.cancel_tx.send(());
}
}
}
#[cfg(feature = "std")]
pub async fn handle_connection<R>(
db: Arc<AimDb<R>>,
config: AimxConfig,
stream: UnixStream,
) -> DbResult<()>
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
#[cfg(feature = "tracing")]
tracing::info!("New remote access connection established");
let mut stream = match perform_handshake(stream, &config, &db).await {
Ok(stream) => stream,
Err(e) => {
#[cfg(feature = "tracing")]
tracing::warn!("Handshake failed: {}", e);
return Err(e);
}
};
#[cfg(feature = "tracing")]
tracing::info!("Handshake complete, client ready");
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<Event>();
let mut conn_state = ConnectionState::new(event_tx);
loop {
let mut line = String::new();
tokio::select! {
read_result = stream.read_line(&mut line) => {
match read_result {
Ok(0) => {
#[cfg(feature = "tracing")]
tracing::info!("Client disconnected gracefully");
break;
}
Ok(_) => {
#[cfg(feature = "tracing")]
tracing::debug!("Received request: {}", line.trim());
let request: Request = match serde_json::from_str(line.trim()) {
Ok(req) => req,
Err(e) => {
#[cfg(feature = "tracing")]
tracing::warn!("Failed to parse request: {}", e);
let error_response =
Response::error(0, "parse_error", format!("Invalid JSON: {}", e));
if let Err(_e) = send_response(&mut stream, &error_response).await {
#[cfg(feature = "tracing")]
tracing::error!("Failed to send error response: {}", _e);
break;
}
continue;
}
};
let response = handle_request(&db, &config, &mut conn_state, request).await;
if let Err(_e) = send_response(&mut stream, &response).await {
#[cfg(feature = "tracing")]
tracing::error!("Failed to send response: {}", _e);
break;
}
}
Err(_e) => {
#[cfg(feature = "tracing")]
tracing::error!("Error reading from stream: {}", _e);
break;
}
}
}
Some(event) = event_rx.recv() => {
if let Err(_e) = send_event(&mut stream, &event).await {
#[cfg(feature = "tracing")]
tracing::error!("Failed to send event: {}", _e);
break;
}
}
}
}
conn_state.cancel_all_subscriptions().await;
#[cfg(feature = "tracing")]
tracing::info!("Connection handler terminating");
Ok(())
}
#[cfg(feature = "std")]
async fn send_event(stream: &mut BufReader<UnixStream>, event: &Event) -> DbResult<()> {
let event_msg = json!({ "event": event });
let event_json = serde_json::to_string(&event_msg).map_err(|e| DbError::JsonWithContext {
context: "Failed to serialize event".to_string(),
source: e,
})?;
stream
.get_mut()
.write_all(event_json.as_bytes())
.await
.map_err(|e| DbError::IoWithContext {
context: "Failed to write event".to_string(),
source: e,
})?;
stream
.get_mut()
.write_all(b"\n")
.await
.map_err(|e| DbError::IoWithContext {
context: "Failed to write event newline".to_string(),
source: e,
})?;
#[cfg(feature = "tracing")]
tracing::trace!("Sent event for subscription: {}", event.subscription_id);
Ok(())
}
#[cfg(feature = "std")]
async fn send_response(stream: &mut BufReader<UnixStream>, response: &Response) -> DbResult<()> {
let response_json = serde_json::to_string(response).map_err(|e| DbError::JsonWithContext {
context: "Failed to serialize response".to_string(),
source: e,
})?;
stream
.get_mut()
.write_all(response_json.as_bytes())
.await
.map_err(|e| DbError::IoWithContext {
context: "Failed to write response".to_string(),
source: e,
})?;
stream
.get_mut()
.write_all(b"\n")
.await
.map_err(|e| DbError::IoWithContext {
context: "Failed to write response newline".to_string(),
source: e,
})?;
#[cfg(feature = "tracing")]
tracing::debug!("Sent response");
Ok(())
}
#[cfg(feature = "std")]
async fn perform_handshake<R>(
stream: UnixStream,
config: &AimxConfig,
db: &Arc<AimDb<R>>,
) -> DbResult<BufReader<UnixStream>>
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
reader
.read_line(&mut line)
.await
.map_err(|e| DbError::IoWithContext {
context: "Failed to read Hello message".to_string(),
source: e,
})?;
#[cfg(feature = "tracing")]
tracing::debug!("Received handshake: {}", line.trim());
let hello: HelloMessage =
serde_json::from_str(line.trim()).map_err(|e| DbError::JsonWithContext {
context: "Failed to parse Hello message".to_string(),
source: e,
})?;
#[cfg(feature = "tracing")]
tracing::debug!(
"Client hello: version={}, client={}",
hello.version,
hello.client
);
if hello.version != "1.0" && hello.version != "1" {
let error_msg = format!(
r#"{{"error":"unsupported_version","message":"Server supports version 1.0, client requested {}"}}"#,
hello.version
);
#[cfg(feature = "tracing")]
tracing::warn!("Unsupported version: {}", hello.version);
let _ = writer.write_all(error_msg.as_bytes()).await;
let _ = writer.write_all(b"\n").await;
let _ = writer.shutdown().await;
return Err(DbError::InvalidOperation {
operation: "handshake".to_string(),
reason: format!("Unsupported version: {}", hello.version),
});
}
let authenticated = if let Some(expected_token) = &config.auth_token {
match &hello.auth_token {
Some(provided_token) if provided_token == expected_token => {
#[cfg(feature = "tracing")]
tracing::debug!("Authentication successful");
true
}
Some(_) => {
let error_msg =
r#"{"error":"authentication_failed","message":"Invalid auth token"}"#;
#[cfg(feature = "tracing")]
tracing::warn!("Authentication failed: invalid token");
let _ = writer.write_all(error_msg.as_bytes()).await;
let _ = writer.write_all(b"\n").await;
let _ = writer.shutdown().await;
return Err(DbError::PermissionDenied {
operation: "authentication".to_string(),
});
}
None => {
let error_msg =
r#"{"error":"authentication_required","message":"Auth token required"}"#;
#[cfg(feature = "tracing")]
tracing::warn!("Authentication failed: no token provided");
let _ = writer.write_all(error_msg.as_bytes()).await;
let _ = writer.write_all(b"\n").await;
let _ = writer.shutdown().await;
return Err(DbError::PermissionDenied {
operation: "authentication".to_string(),
});
}
}
} else {
false
};
let permissions = match &config.security_policy {
crate::remote::SecurityPolicy::ReadOnly => vec!["read".to_string()],
crate::remote::SecurityPolicy::ReadWrite { .. } => {
vec!["read".to_string(), "write".to_string()]
}
};
let writable_records = match &config.security_policy {
crate::remote::SecurityPolicy::ReadOnly => vec![],
crate::remote::SecurityPolicy::ReadWrite {
writable_records: _writable_type_ids,
} => {
let all_records: Vec<RecordMetadata> = db.list_records();
all_records
.into_iter()
.filter(|meta| meta.writable)
.map(|meta| meta.name)
.collect()
}
};
let welcome = WelcomeMessage {
version: "1.0".to_string(),
server: "aimdb".to_string(),
permissions,
writable_records,
max_subscriptions: Some(config.subscription_queue_size),
authenticated: Some(authenticated),
};
let welcome_json = serde_json::to_string(&welcome).map_err(|e| DbError::JsonWithContext {
context: "Failed to serialize Welcome message".to_string(),
source: e,
})?;
writer
.write_all(welcome_json.as_bytes())
.await
.map_err(|e| DbError::IoWithContext {
context: "Failed to write Welcome message".to_string(),
source: e,
})?;
writer
.write_all(b"\n")
.await
.map_err(|e| DbError::IoWithContext {
context: "Failed to write Welcome newline".to_string(),
source: e,
})?;
#[cfg(feature = "tracing")]
tracing::info!("Sent Welcome message to client");
let stream = reader
.into_inner()
.reunite(writer)
.map_err(|e| DbError::Io {
source: std::io::Error::other(e.to_string()),
})?;
Ok(BufReader::new(stream))
}
#[cfg(feature = "std")]
async fn handle_request<R>(
db: &Arc<AimDb<R>>,
config: &AimxConfig,
conn_state: &mut ConnectionState,
request: Request,
) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
#[cfg(feature = "tracing")]
tracing::debug!(
"Handling request: method={}, id={}",
request.method,
request.id
);
match request.method.as_str() {
"record.list" => handle_record_list(db, config, request.id).await,
"record.get" => handle_record_get(db, config, request.id, request.params).await,
"record.set" => handle_record_set(db, config, request.id, request.params).await,
"record.subscribe" => {
handle_record_subscribe(db, config, conn_state, request.id, request.params).await
}
"record.unsubscribe" => {
handle_record_unsubscribe(conn_state, request.id, request.params).await
}
"record.drain" => handle_record_drain(db, conn_state, request.id, request.params).await,
"record.query" => handle_record_query(db, request.id, request.params).await,
"graph.nodes" => handle_graph_nodes(db, request.id).await,
"graph.edges" => handle_graph_edges(db, request.id).await,
"graph.topo_order" => handle_graph_topo_order(db, request.id).await,
_ => {
#[cfg(feature = "tracing")]
tracing::warn!("Unknown method: {}", request.method);
Response::error(
request.id,
"method_not_found",
format!("Unknown method: {}", request.method),
)
}
}
}
#[cfg(feature = "std")]
async fn handle_record_list<R>(
db: &Arc<AimDb<R>>,
_config: &AimxConfig,
request_id: u64,
) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
#[cfg(feature = "tracing")]
tracing::debug!("Listing records");
let records: Vec<RecordMetadata> = db.list_records();
#[cfg(feature = "tracing")]
tracing::debug!("Found {} records", records.len());
Response::success(request_id, json!(records))
}
#[cfg(feature = "std")]
async fn handle_record_get<R>(
db: &Arc<AimDb<R>>,
_config: &AimxConfig,
request_id: u64,
params: Option<serde_json::Value>,
) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
let record_name = match params {
Some(serde_json::Value::Object(map)) => match map.get("record") {
Some(serde_json::Value::String(name)) => name.clone(),
_ => {
#[cfg(feature = "tracing")]
tracing::warn!("Missing or invalid 'record' parameter");
return Response::error(
request_id,
"invalid_params",
"Missing or invalid 'record' parameter".to_string(),
);
}
},
_ => {
#[cfg(feature = "tracing")]
tracing::warn!("Missing params object");
return Response::error(
request_id,
"invalid_params",
"Missing params object".to_string(),
);
}
};
#[cfg(feature = "tracing")]
tracing::debug!("Getting value for record: {}", record_name);
match db.try_latest_as_json(&record_name) {
Some(value) => {
#[cfg(feature = "tracing")]
tracing::debug!("Successfully retrieved value for {}", record_name);
Response::success(request_id, value)
}
None => {
#[cfg(feature = "tracing")]
tracing::warn!("No value available for record: {}", record_name);
Response::error(
request_id,
"not_found",
format!("No value available for record: {}", record_name),
)
}
}
}
#[cfg(feature = "std")]
async fn handle_record_set<R>(
db: &Arc<AimDb<R>>,
config: &AimxConfig,
request_id: u64,
params: Option<serde_json::Value>,
) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
use crate::remote::SecurityPolicy;
let writable_records = match &config.security_policy {
SecurityPolicy::ReadOnly => {
#[cfg(feature = "tracing")]
tracing::warn!("record.set called but security policy is ReadOnly");
return Response::error(
request_id,
"permission_denied",
"Write operations not allowed (ReadOnly security policy)".to_string(),
);
}
SecurityPolicy::ReadWrite { writable_records } => writable_records,
};
let (record_name, value) = match params {
Some(serde_json::Value::Object(ref map)) => {
let name = match map.get("name") {
Some(serde_json::Value::String(n)) => n.clone(),
_ => {
#[cfg(feature = "tracing")]
tracing::warn!("Missing or invalid 'name' parameter in record.set");
return Response::error(
request_id,
"invalid_params",
"Missing or invalid 'name' parameter (expected string)".to_string(),
);
}
};
let val = match map.get("value") {
Some(v) => v.clone(),
None => {
#[cfg(feature = "tracing")]
tracing::warn!("Missing 'value' parameter in record.set");
return Response::error(
request_id,
"invalid_params",
"Missing 'value' parameter".to_string(),
);
}
};
(name, val)
}
_ => {
#[cfg(feature = "tracing")]
tracing::warn!("Missing params object in record.set");
return Response::error(
request_id,
"invalid_params",
"Missing params object".to_string(),
);
}
};
#[cfg(feature = "tracing")]
tracing::debug!("Setting value for record: {}", record_name);
if !writable_records.contains(&record_name) {
#[cfg(feature = "tracing")]
tracing::warn!("Record '{}' not in writable_records set", record_name);
return Response::error(
request_id,
"permission_denied",
format!(
"Record '{}' is not writable. \
Configure with .with_writable_record() to allow writes.",
record_name
),
);
}
match db.set_record_from_json(&record_name, value) {
Ok(()) => {
#[cfg(feature = "tracing")]
tracing::info!("Successfully set value for record: {}", record_name);
let result = if let Some(updated_value) = db.try_latest_as_json(&record_name) {
serde_json::json!({
"status": "success",
"value": updated_value,
})
} else {
serde_json::json!({
"status": "success",
})
};
Response::success(request_id, result)
}
Err(e) => {
#[cfg(feature = "tracing")]
tracing::error!("Failed to set value for record '{}': {}", record_name, e);
let (code, message) = match e {
crate::DbError::RecordKeyNotFound { key } => {
("not_found", format!("Record '{}' not found", key))
}
crate::DbError::PermissionDenied { operation } => {
("permission_denied", operation)
}
crate::DbError::JsonWithContext { context, .. } => (
"validation_error",
format!("JSON validation failed: {}", context),
),
crate::DbError::RuntimeError { message } => ("internal_error", message),
_ => ("internal_error", format!("Failed to set value: {}", e)),
};
Response::error(request_id, code, message)
}
}
}
#[cfg(feature = "std")]
async fn handle_record_subscribe<R>(
db: &Arc<AimDb<R>>,
config: &AimxConfig,
conn_state: &mut ConnectionState,
request_id: u64,
params: Option<serde_json::Value>,
) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
let record_name = match params {
Some(serde_json::Value::Object(ref map)) => match map.get("name") {
Some(serde_json::Value::String(name)) => name.clone(),
_ => {
#[cfg(feature = "tracing")]
tracing::warn!("Missing or invalid 'name' parameter in record.subscribe");
return Response::error(
request_id,
"invalid_params",
"Missing or invalid 'name' parameter (expected string)".to_string(),
);
}
},
_ => {
#[cfg(feature = "tracing")]
tracing::warn!("Missing params object in record.subscribe");
return Response::error(
request_id,
"invalid_params",
"Missing params object".to_string(),
);
}
};
let _send_initial = params
.as_ref()
.and_then(|p| p.as_object())
.and_then(|map| map.get("send_initial"))
.and_then(|v| v.as_bool())
.unwrap_or(true);
#[cfg(feature = "tracing")]
tracing::debug!("Subscribing to record: {}", record_name);
if conn_state.subscriptions.len() >= config.subscription_queue_size {
#[cfg(feature = "tracing")]
tracing::warn!(
"Too many subscriptions: {} (max: {})",
conn_state.subscriptions.len(),
config.subscription_queue_size
);
return Response::error(
request_id,
"too_many_subscriptions",
format!(
"Maximum subscriptions reached: {}",
config.subscription_queue_size
),
);
}
let subscription_id = conn_state.generate_subscription_id();
let (value_rx, cancel_tx) =
match db.subscribe_record_updates(&record_name, config.subscription_queue_size) {
Ok(channels) => channels,
Err(e) => {
let (code, message) = match &e {
crate::DbError::RecordKeyNotFound { key } => {
#[cfg(feature = "tracing")]
tracing::warn!("Record not found: {}", key);
("not_found", format!("Record '{}' not found", key))
}
_ => {
#[cfg(feature = "tracing")]
tracing::error!("Failed to subscribe to record updates: {}", e);
("internal_error", format!("Failed to subscribe: {}", e))
}
};
return Response::error(request_id, code, message);
}
};
let event_tx = conn_state.event_tx.clone();
let sub_id_clone = subscription_id.clone();
let stream_handle = tokio::spawn(async move {
stream_subscription_events(sub_id_clone, value_rx, event_tx).await;
});
let handle = SubscriptionHandle {
subscription_id: subscription_id.clone(),
record_name: record_name.clone(),
cancel_tx,
};
conn_state.add_subscription(handle);
std::mem::drop(stream_handle);
#[cfg(feature = "tracing")]
tracing::info!(
"Created subscription {} for record {}",
subscription_id,
record_name
);
Response::success(
request_id,
json!({
"subscription_id": subscription_id,
"queue_size": config.subscription_queue_size,
}),
)
}
#[cfg(feature = "std")]
async fn stream_subscription_events(
subscription_id: String,
mut value_rx: tokio::sync::mpsc::Receiver<serde_json::Value>,
event_tx: tokio::sync::mpsc::UnboundedSender<Event>,
) {
let mut sequence: u64 = 1;
#[cfg(feature = "tracing")]
tracing::debug!(
"Event streaming task started for subscription: {}",
subscription_id
);
while let Some(json_value) = value_rx.recv().await {
let duration = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
let timestamp = format!("{}.{:09}", duration.as_secs(), duration.subsec_nanos());
let event = Event {
subscription_id: subscription_id.clone(),
sequence,
data: json_value,
timestamp,
dropped: None, };
if event_tx.send(event).is_err() {
#[cfg(feature = "tracing")]
tracing::debug!(
"Event channel closed, terminating stream for subscription: {}",
subscription_id
);
break;
}
sequence += 1;
}
#[cfg(feature = "tracing")]
tracing::debug!(
"Event streaming task terminated for subscription: {}",
subscription_id
);
}
#[cfg(feature = "std")]
async fn handle_record_unsubscribe(
conn_state: &mut ConnectionState,
request_id: u64,
params: Option<serde_json::Value>,
) -> Response {
let subscription_id = match params {
Some(serde_json::Value::Object(ref map)) => match map.get("subscription_id") {
Some(serde_json::Value::String(id)) => id.clone(),
_ => {
return Response::error(
request_id,
"invalid_params",
"Missing or invalid 'subscription_id' parameter".to_string(),
)
}
},
_ => {
return Response::error(
request_id,
"invalid_params",
"Missing 'subscription_id' parameter".to_string(),
)
}
};
#[cfg(feature = "tracing")]
tracing::debug!("Unsubscribing from subscription_id: {}", subscription_id);
match conn_state.subscriptions.remove(&subscription_id) {
Some(handle) => {
let _ = handle.cancel_tx.send(());
#[cfg(feature = "tracing")]
tracing::debug!(
"Cancelled subscription {} for record {}",
subscription_id,
handle.record_name
);
Response::success(
request_id,
serde_json::json!({
"subscription_id": subscription_id,
"status": "cancelled"
}),
)
}
None => {
#[cfg(feature = "tracing")]
tracing::warn!("Subscription not found: {}", subscription_id);
Response::error(
request_id,
"not_found",
format!("Subscription '{}' not found", subscription_id),
)
}
}
}
#[cfg(feature = "std")]
async fn handle_record_drain<R>(
db: &Arc<AimDb<R>>,
conn_state: &mut ConnectionState,
request_id: u64,
params: Option<serde_json::Value>,
) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
let record_name = match params {
Some(serde_json::Value::Object(ref map)) => match map.get("name") {
Some(serde_json::Value::String(name)) => name.clone(),
_ => {
return Response::error(
request_id,
"invalid_params",
"Missing or invalid 'name' parameter (expected string)".to_string(),
);
}
},
_ => {
return Response::error(
request_id,
"invalid_params",
"Missing params object".to_string(),
);
}
};
let limit = params
.as_ref()
.and_then(|p| p.as_object())
.and_then(|map| map.get("limit"))
.and_then(|v| v.as_u64())
.map(|v| usize::try_from(v).unwrap_or(usize::MAX))
.unwrap_or(usize::MAX);
#[cfg(feature = "tracing")]
tracing::debug!(
"Draining record: {} (limit: {})",
record_name,
if limit == usize::MAX {
"all".to_string()
} else {
limit.to_string()
}
);
if !conn_state.drain_readers.contains_key(&record_name) {
let id = match db.inner().resolve_str(&record_name) {
Some(id) => id,
None => {
return Response::error(
request_id,
"not_found",
format!("Record '{}' not found", record_name),
);
}
};
let record = match db.inner().storage(id) {
Some(r) => r,
None => {
return Response::error(
request_id,
"not_found",
format!("Record '{}' storage not found", record_name),
);
}
};
let reader = match record.subscribe_json() {
Ok(r) => r,
Err(e) => {
return Response::error(
request_id,
"remote_access_not_enabled",
format!(
"Record '{}' not configured with .with_remote_access(): {}",
record_name, e
),
);
}
};
conn_state.drain_readers.insert(record_name.clone(), reader);
}
let reader = conn_state.drain_readers.get_mut(&record_name).unwrap();
let mut values = Vec::new();
loop {
if values.len() >= limit {
break;
}
match reader.try_recv_json() {
Ok(val) => values.push(val),
Err(DbError::BufferEmpty) => break,
Err(DbError::BufferLagged { .. }) => {
#[cfg(feature = "tracing")]
tracing::warn!(
"Drain reader lagged for record '{}' — some values were lost",
record_name
);
continue;
}
Err(_) => break,
}
}
let count = values.len();
#[cfg(feature = "tracing")]
tracing::debug!("Drained {} values from record '{}'", count, record_name);
Response::success(
request_id,
json!({
"record_name": record_name,
"values": values,
"count": count,
}),
)
}
pub type QueryHandlerFn = Box<
dyn Fn(
QueryHandlerParams,
) -> core::pin::Pin<
Box<dyn core::future::Future<Output = Result<serde_json::Value, String>> + Send>,
> + Send
+ Sync,
>;
#[derive(Debug, Clone)]
pub struct QueryHandlerParams {
pub name: String,
pub limit: Option<usize>,
pub start: Option<u64>,
pub end: Option<u64>,
}
#[cfg(feature = "std")]
async fn handle_record_query<R>(
db: &Arc<AimDb<R>>,
request_id: u64,
params: Option<serde_json::Value>,
) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
let handler = match db.extensions().get::<QueryHandlerFn>() {
Some(h) => h,
None => {
return Response::error(
request_id,
"not_configured",
"Persistence not configured. Call .with_persistence() on the builder.".to_string(),
);
}
};
let (name, limit, start, end) = match ¶ms {
Some(serde_json::Value::Object(map)) => {
let name = map
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("*")
.to_string();
let limit = map
.get("limit")
.and_then(|v| v.as_u64())
.and_then(|v| usize::try_from(v).ok());
let start = map.get("start").and_then(|v| v.as_u64());
let end = map.get("end").and_then(|v| v.as_u64());
(name, limit, start, end)
}
_ => ("*".to_string(), None, None, None),
};
let query_params = QueryHandlerParams {
name,
limit,
start,
end,
};
match handler(query_params).await {
Ok(result) => Response::success(request_id, result),
Err(msg) => Response::error(request_id, "query_error", msg),
}
}
#[cfg(feature = "std")]
async fn handle_graph_nodes<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
#[cfg(feature = "tracing")]
tracing::debug!("Getting dependency graph nodes");
let graph = db.inner().dependency_graph();
let nodes = &graph.nodes;
#[cfg(feature = "tracing")]
tracing::debug!("Returning {} graph nodes", nodes.len());
Response::success(request_id, json!(nodes))
}
#[cfg(feature = "std")]
async fn handle_graph_edges<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
#[cfg(feature = "tracing")]
tracing::debug!("Getting dependency graph edges");
let graph = db.inner().dependency_graph();
let edges = &graph.edges;
#[cfg(feature = "tracing")]
tracing::debug!("Returning {} graph edges", edges.len());
Response::success(request_id, json!(edges))
}
#[cfg(feature = "std")]
async fn handle_graph_topo_order<R>(db: &Arc<AimDb<R>>, request_id: u64) -> Response
where
R: crate::RuntimeAdapter + crate::Spawn + 'static,
{
#[cfg(feature = "tracing")]
tracing::debug!("Getting topological order");
let graph = db.inner().dependency_graph();
let topo_order = graph.topo_order();
#[cfg(feature = "tracing")]
tracing::debug!(
"Returning topological order with {} records",
topo_order.len()
);
Response::success(request_id, json!(topo_order))
}