use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use runar_node::services::{LifecycleContext, RequestContext, ServiceFuture};
use runar_node::AbstractService;
use runar_serializer::{ArcValue, Plain, ValueCategory};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
use crate::sqlite::{
DataType, Params as SqlParams, Schema as SqliteSchemaDef, SqlQuery, Value as SqliteValue,
};
#[derive(Debug, Serialize, Deserialize, Clone, Plain)] pub struct InsertOneRequest {
pub collection: String,
pub document: HashMap<String, ArcValue>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Plain)]
pub struct InsertOneResponse {
pub inserted_id: String, }
#[derive(Debug, Serialize, Deserialize, Clone, Plain)]
pub struct FindOneRequest {
pub collection: String,
pub filter: HashMap<String, ArcValue>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Plain)]
pub struct FindOneResponse {
pub document: Option<HashMap<String, ArcValue>>,
}
pub struct CrudSqliteService {
name: String,
path: String,
version: String,
network_id: Option<String>,
description: String,
store_path: String, schema: Arc<SqliteSchemaDef>, }
impl CrudSqliteService {
pub fn new(
name: &str,
path: &str,
store_path: &str,
schema: SqliteSchemaDef, ) -> Self {
Self {
name: name.to_string(),
path: path.to_string(),
version: "0.0.1".to_string(),
network_id: None,
description: "CRUD Service".to_string(),
store_path: store_path.to_string(),
schema: Arc::new(schema),
}
}
fn schema_aware_convert_row_values(
&self,
collection_name: &str,
row_map: Arc<HashMap<String, ArcValue>>,
context: &RequestContext, ) -> Result<Arc<HashMap<String, ArcValue>>> {
let table_schema = self
.schema
.tables
.iter()
.find(|t| t.name == collection_name)
.ok_or_else(|| {
anyhow!(
"Schema not found for collection '{}' during row value conversion",
collection_name
)
})?;
let mut updates = row_map.as_ref().clone();
for (field_name, arc_value) in updates.iter_mut() {
if let Some(col_def) = table_schema.columns.iter().find(|c| c.name == *field_name) {
if col_def.data_type == DataType::Boolean {
match arc_value.category() {
ValueCategory::Primitive => {
if let Ok(int_val_ref) = arc_value.as_type_ref::<i64>() {
let bool_val = *int_val_ref != 0;
context.debug(format!(
"Converting field '{}' (i64: {}) to bool: {} for collection '{}' based on schema",
field_name, *int_val_ref, bool_val, collection_name
));
*arc_value = ArcValue::new_primitive(bool_val);
} else if arc_value.as_type_ref::<bool>().is_ok() {
} else {
context.warn(format!(
"Field '{field_name}' in collection '{collection_name}' is schema type Boolean, but ArcValue is Primitive but not i64 or bool: {arc_value:?}. Leaving as is.",
));
}
}
ValueCategory::Null => {
}
_ => {
context.warn(format!(
"Field '{field_name}' in collection '{collection_name}' is schema type Boolean, but ArcValue is not Primitive or Null: {arc_value:?}. Leaving as is.",
));
}
}
}
}
}
Ok(Arc::new(updates))
}
fn sqlite_action_path(&self, action: &str) -> String {
format!("{}/{}", self.store_path, action)
}
fn arc_value_to_sqlite_value(av: &mut ArcValue) -> Result<SqliteValue> {
match av.category() {
ValueCategory::Null => Ok(SqliteValue::Null),
ValueCategory::Primitive => {
if let Ok(s_val_arc) = av.as_type_ref::<String>() {
Ok(SqliteValue::Text(s_val_arc.as_ref().clone()))
} else if let Ok(i_val_arc) = av.as_type_ref::<i64>() {
Ok(SqliteValue::Integer(*i_val_arc))
} else if let Ok(f_val_arc) = av.as_type_ref::<f64>() {
Ok(SqliteValue::Real(*f_val_arc))
} else if let Ok(b_val_arc) = av.as_type_ref::<bool>() {
Ok(SqliteValue::Integer(if *b_val_arc { 1 } else { 0 })) } else {
Err(anyhow!(
"Unsupported primitive type '{}' within ArcValue for SQLite conversion.",
av.type_name().map_or_else(
|| "[value was None, inconsistent for Primitive category]".to_string(),
|tn| tn.to_string()
)
))
}
}
ValueCategory::Bytes => {
let bytes_vec_arc = av.as_type_ref::<Vec<u8>>()?;
Ok(SqliteValue::Blob(bytes_vec_arc.as_ref().clone()))
}
_ => Err(anyhow!(
"Unsupported ArcValue category {:?} for direct SQLite conversion",
av.category()
)),
}
}
async fn handle_insert_one(
self: Arc<Self>,
context: RequestContext,
request_payload: Option<ArcValue>,
) -> Result<ArcValue> {
context.debug(format!(
"Handling insertOne request for CrudSqliteService '{}'",
self.name
));
let payload =
request_payload.ok_or_else(|| anyhow!("Request payload is missing for insertOne"))?;
let req: Arc<InsertOneRequest> = payload
.as_type_ref::<InsertOneRequest>()
.with_context(|| "Failed to deserialize InsertOneRequest from payload")?;
context.info(format!(
"Attempting to insert into collection: {}",
req.collection
));
let table_def = self
.schema
.tables
.iter()
.find(|t| t.name == req.collection)
.ok_or_else(|| {
let err_msg = format!("Collection '{}' not found in schema.", req.collection);
context.error(&err_msg);
anyhow!(err_msg)
})?;
let mut doc_to_insert = req.document.clone();
let id_field_name = "_id".to_string();
let inserted_id_av = match doc_to_insert.get(&id_field_name) {
Some(id_val) => id_val.clone(), None => {
let new_id = Uuid::new_v4().to_string();
let new_id_av = ArcValue::new_primitive(new_id);
doc_to_insert.insert(id_field_name.clone(), new_id_av.clone());
new_id_av
}
};
let inserted_id_av = inserted_id_av.clone(); let inserted_id = inserted_id_av
.as_type_ref::<String>()
.with_context(|| "Inserted _id must be a string")?;
let estimated_fields = doc_to_insert.len();
let mut column_names: Vec<String> = Vec::with_capacity(estimated_fields);
let mut value_params: Vec<SqliteValue> = Vec::with_capacity(estimated_fields);
for (field_name, arc_value) in &mut doc_to_insert {
if !table_def.columns.iter().any(|c| &c.name == field_name) {
let err_msg = format!(
"Field '{}' not defined in schema for collection '{}'.",
field_name, req.collection
);
context.error(&err_msg);
return Err(anyhow!(err_msg));
}
let column_def = table_def
.columns
.iter()
.find(|c| &c.name == field_name)
.unwrap(); let expected_db_type = &column_def.data_type;
let provided_category = arc_value.category();
let type_match = match (expected_db_type, provided_category) {
(DataType::Integer, ValueCategory::Primitive) => {
matches!(arc_value.type_name(), Some("i64"))
}
(DataType::Real, ValueCategory::Primitive) => {
matches!(arc_value.type_name(), Some("f64"))
}
(DataType::Text, ValueCategory::Primitive) => {
matches!(
arc_value.type_name(),
Some("String") | Some("alloc::string::String")
)
}
(DataType::Boolean, ValueCategory::Primitive) => {
matches!(arc_value.type_name(), Some("bool"))
}
(DataType::Blob, ValueCategory::Bytes) => arc_value.has_value(),
(_, ValueCategory::Null) => !arc_value.has_value(),
_ => false, };
if !type_match {
let provided_type_name_for_error = arc_value.type_name().map_or_else(
|| format!("N/A (value is None, category: {provided_category:?})"),
|tn| tn.to_string(),
);
let error_message = format!(
"Type mismatch or inconsistent state for field '{field_name}': schema expects DB type {expected_db_type:?}. Received category {provided_category:?} with specific type '{provided_type_name_for_error}'.",
);
context.error(&error_message);
return Err(anyhow!(error_message));
}
column_names.push(format!("\"{field_name}\"")); let sqlite_val = Self::arc_value_to_sqlite_value(arc_value).with_context(|| {
format!("Failed to convert field '{field_name}' to SqliteValue")
})?;
value_params.push(sqlite_val);
}
if column_names.is_empty() {
return Err(anyhow!("Cannot insert an empty document."));
}
let column_names_sql = column_names.join(", ");
let placeholders_sql = (0..column_names.len())
.map(|_| "?")
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"INSERT INTO \"{}\" ({}) VALUES ({})",
req.collection, column_names_sql, placeholders_sql
);
context.debug(format!(
"Executing INSERT SQL: {sql} with params: {value_params:?}",
));
let sql_query = SqlQuery {
statement: sql,
params: SqlParams {
values: value_params,
},
};
let action_path = self.sqlite_action_path("execute_query");
let rows_affected: Arc<i64> = context
.request(&action_path, Some(ArcValue::new_struct(sql_query)))
.await
.expect("Failed to execute INSERT statement")
.as_type_ref()?;
match rows_affected.as_ref() {
1 => {
context.info(format!(
"Successfully inserted document with id '{}' into collection '{}'.",
inserted_id, req.collection
));
let response_struct = InsertOneResponse {
inserted_id: inserted_id.as_ref().clone(),
};
let final_response_av = ArcValue::new_struct(response_struct);
Ok(final_response_av)
}
other_rows_affected => {
let err_msg = format!(
"INSERT statement affected {} rows, expected 1, for id '{}' in collection '{}'.",
other_rows_affected, inserted_id, req.collection
);
context.error(&err_msg);
Err(anyhow!(err_msg))
}
}
}
async fn handle_find_one(
self: Arc<Self>,
context: RequestContext,
request_payload: Option<ArcValue>,
) -> Result<ArcValue> {
context.debug(format!(
"Handling findOne request for CrudSqliteService '{}'",
self.name
));
let payload =
request_payload.ok_or_else(|| anyhow!("Request payload is missing for findOne"))?;
let req: Arc<FindOneRequest> = payload
.as_type_ref::<FindOneRequest>()
.context("Failed to deserialize FindOneRequest from payload")?;
context.info(format!(
"Attempting to find_one in collection: '{}' with filter: {:?}",
req.collection, req.filter
));
let table_def = self
.schema
.tables
.iter()
.find(|t| t.name == req.collection)
.ok_or_else(|| {
let err_msg = format!("Collection '{}' not found in schema.", req.collection);
context.error(&err_msg);
anyhow!(err_msg)
})?;
if req.filter.is_empty() {
context.warn(format!(
"Filter is empty for findOne on collection '{}'. This will match the first document.",
req.collection
));
return Err(anyhow!("Filter cannot be empty for findOne operation."));
}
let estimated_filters = req.filter.len();
let mut where_clauses: Vec<String> = Vec::with_capacity(estimated_filters);
let mut value_params: Vec<SqliteValue> = Vec::with_capacity(estimated_filters);
for (field_name, arc_value) in req.filter.clone().iter_mut() {
if !table_def.columns.iter().any(|c| &c.name == field_name) {
let err_msg = format!(
"Filter field '{}' not defined in schema for collection '{}'.",
field_name, req.collection
);
context.error(&err_msg);
return Err(anyhow!(err_msg));
}
where_clauses.push(format!("\"{field_name}\" = ?"));
let sqlite_val = Self::arc_value_to_sqlite_value(arc_value).with_context(|| {
format!("Failed to convert filter field '{field_name}' to SqliteValue",)
})?;
value_params.push(sqlite_val);
}
let where_sql = where_clauses.join(" AND ");
let sql = format!(
"SELECT * FROM \"{}\" WHERE {} LIMIT 1",
req.collection, where_sql
);
context.debug(format!(
"Constructed SQL for findOne on '{}': \"{}\" with params {:?}",
req.collection, sql, &value_params
));
let sql_query_struct = SqlQuery {
statement: sql,
params: SqlParams {
values: value_params,
},
};
let action_path = self.sqlite_action_path("execute_query");
let sql_for_logging = sql_query_struct.statement.clone(); let request_payload_for_sqlite = ArcValue::new_struct(sql_query_struct);
context.debug(format!(
"Sending request to SqliteService at '{action_path}' with payload for SQL: {sql_for_logging}",
));
let rows: Arc<Vec<ArcValue>> = context
.request(&action_path, Some(request_payload_for_sqlite))
.await
.with_context(|| {
format!(
"Failed to execute SELECT statement for collection '{}' (SQL: '{}') via SqliteService at '{}'",
req.collection, sql_for_logging, action_path
)
})?
.as_type_ref::<Vec<ArcValue>>()?;
if !rows.is_empty() {
let document_arc_value_map = rows.first().unwrap(); context.info(format!(
"Found document in collection '{}' with filter {:?}: {:?}",
req.collection, req.filter, document_arc_value_map
));
match document_arc_value_map.as_type_ref::<HashMap<String, ArcValue>>() {
Ok(map_data) => {
match self.schema_aware_convert_row_values(&req.collection, map_data, &context)
{
Ok(converted_map_data) => {
let response_struct = FindOneResponse {
document: Some(converted_map_data.as_ref().clone()),
};
Ok(ArcValue::new_struct(response_struct))
}
Err(e) => {
context.error(format!(
"Failed schema-aware conversion for findOne on collection '{}': {}. Original map: {:?}",
req.collection, e, document_arc_value_map ));
Err(anyhow!("Failed schema-aware conversion for findOne: {}", e))
}
}
}
Err(e) => {
context.error(format!(
"Failed to convert found document ArcValue to map for FindOneResponse: {e}. Doc AV: {document_arc_value_map:?}",
));
Err(anyhow!(
"Failed to convert document ArcValue to map for FindOneResponse: {}",
e
))
}
}
} else {
context.info(format!(
"No document found in collection '{}' for filter {:?}. SQL: {}",
req.collection, req.filter, sql_for_logging
));
let response_struct = FindOneResponse { document: None };
Ok(ArcValue::new_struct(response_struct))
}
}
}
#[async_trait]
impl AbstractService for CrudSqliteService {
fn name(&self) -> &str {
&self.name
}
fn path(&self) -> &str {
&self.path
}
fn version(&self) -> &str {
&self.version
}
fn network_id(&self) -> Option<String> {
self.network_id.clone()
}
fn set_network_id(&mut self, network_id: String) {
self.network_id = Some(network_id);
}
fn description(&self) -> &str {
&self.description
}
async fn init(&self, context: LifecycleContext) -> Result<()> {
context.info(format!(
"Initializing CrudSqliteService '{}', store_path: '{}'",
self.name, self.store_path
));
let service_arc = Arc::new(self.clone_arc_safe().await?);
let service_arc_for_insert_capture = Arc::clone(&service_arc); let insert_one_handler =
Arc::new(move |payload: Option<ArcValue>, req_ctx: RequestContext| {
let service_for_async = Arc::clone(&service_arc_for_insert_capture); Box::pin(async move { service_for_async.handle_insert_one(req_ctx, payload).await })
as ServiceFuture
});
context
.register_action("insertOne", insert_one_handler)
.await?;
context.info(format!(
"Action 'insertOne' registered for service '{}'",
self.name
));
let service_arc_for_find_one = Arc::clone(&service_arc); let find_one_handler =
Arc::new(move |payload: Option<ArcValue>, req_ctx: RequestContext| {
let service_for_async = Arc::clone(&service_arc_for_find_one); Box::pin(async move { service_for_async.handle_find_one(req_ctx, payload).await })
as ServiceFuture
});
context.register_action("findOne", find_one_handler).await?;
context.info(format!(
"Action 'findOne' registered for service '{}'",
self.name
));
Ok(())
}
async fn start(&self, context: LifecycleContext) -> Result<()> {
context.info(format!("CrudSqliteService '{}' started.", self.name));
Ok(())
}
async fn stop(&self, context: LifecycleContext) -> Result<()> {
context.info(format!("CrudSqliteService '{}' stopped.", self.name));
Ok(())
}
}
impl CrudSqliteService {
async fn clone_arc_safe(&self) -> Result<Self>
where
Self: Clone,
{
Ok(self.clone())
}
}
impl Clone for CrudSqliteService {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
path: self.path.clone(),
version: self.version.clone(),
network_id: self.network_id.clone(),
description: self.description.clone(),
store_path: self.store_path.clone(),
schema: Arc::clone(&self.schema),
}
}
}