use anyhow::{anyhow, Result};
use async_trait::async_trait;
use runar_common::logging::Logger;
use runar_macros_common::{log_debug, log_error, log_info, log_warn};
use runar_node::services::{EventContext, LifecycleContext, RequestContext, ServiceFuture};
use runar_node::AbstractService;
use runar_serializer::{ArcValue, Plain};
use rusqlite::types::ToSqlOutput;
use rusqlite::types::{Null, ValueRef as RusqliteValueRef};
use rusqlite::{params_from_iter, Connection, Result as RusqliteResult, ToSql};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::SystemTime;
use tokio::sync::{mpsc, oneshot};
const EXECUTE_QUERY_ACTION: &str = "execute_query";
const REPLICATION_GET_TABLE_EVENTS_ACTION: &str = "replication/get_table_events";
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum DataType {
Integer,
Real,
Text,
Blob,
Boolean,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ColumnDefinition {
pub name: String,
pub data_type: DataType,
pub primary_key: bool,
pub autoincrement: bool,
pub not_null: bool,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TableDefinition {
pub name: String,
pub columns: Vec<ColumnDefinition>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct IndexDefinition {
pub name: String,
pub table_name: String,
pub columns: Vec<String>,
pub unique: bool,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Schema {
pub tables: Vec<TableDefinition>,
pub indexes: Vec<IndexDefinition>,
}
pub enum SqliteWorkerCommand {
ApplySchema {
schema: Schema, reply_to: oneshot::Sender<Result<(), String>>,
},
Execute {
query: SqlQuery, reply_to: oneshot::Sender<Result<usize, String>>,
},
Query {
query: SqlQuery, reply_to: oneshot::Sender<Result<Vec<HashMap<String, Value>>, String>>, },
Shutdown {
reply_to: oneshot::Sender<Result<(), String>>,
},
}
pub struct SqliteWorker {
connection: Connection,
receiver: mpsc::Receiver<SqliteWorkerCommand>,
logger: Arc<Logger>, ready_tx: Option<oneshot::Sender<()>>, }
impl SqliteWorker {
pub fn new(
db_path: String,
receiver: mpsc::Receiver<SqliteWorkerCommand>,
logger: Arc<Logger>,
ready_tx: oneshot::Sender<()>, symmetric_key: Option<Vec<u8>>, ) -> Result<Self, String> {
let connection = Connection::open(db_path.clone()).map_err(|e| {
let err_msg = format!("Failed to open SQLite connection to '{db_path}': {e}");
log_error!(logger, "{}", err_msg);
err_msg
})?;
if let Some(key_bytes) = symmetric_key {
let hex_key = format!("x'{}'", hex::encode(&key_bytes));
connection
.pragma_update(None, "key", &hex_key)
.map_err(|e| {
let err_msg = format!("Failed to set database key: {e}");
log_error!(logger, "{}", err_msg);
err_msg
})?;
log_debug!(
logger,
"Database key set successfully using provided symmetric key."
);
}
log_debug!(
logger,
"SqliteWorker::new: Connection opened. TODO: Apply pragmas/initial setup if needed."
);
Ok(Self {
connection,
receiver,
logger,
ready_tx: Some(ready_tx),
})
}
pub async fn run(mut self) {
if let Some(tx) = self.ready_tx.take() {
if tx.send(()).is_err() {
self.logger
.error("Failed to send ready signal; receiver was dropped.");
return; }
}
log_info!(self.logger, "SqliteWorker started processing loop.");
while let Some(command) = self.receiver.recv().await {
match command {
SqliteWorkerCommand::ApplySchema { schema, reply_to } => {
log_debug!(self.logger, "Processing ApplySchema command");
let res = apply_schema_internal(&self.connection, &schema, &self.logger);
let _ = reply_to.send(res); }
SqliteWorkerCommand::Execute { query, reply_to } => {
log_debug!(self.logger, "Processing Execute command");
let res = execute_internal(
&self.connection,
&query.statement,
&query.params,
&self.logger,
);
let _ = reply_to.send(res);
}
SqliteWorkerCommand::Query { query, reply_to } => {
log_debug!(self.logger, "Processing Query command");
let res = query_internal(
&self.connection,
&query.statement,
&query.params,
&self.logger,
);
let _ = reply_to.send(res);
}
SqliteWorkerCommand::Shutdown { reply_to } => {
log_info!(self.logger, "SqliteWorker received Shutdown command.");
let _ = reply_to.send(Ok(()));
break; }
}
}
log_info!(self.logger, "SqliteWorker finished.");
}
}
fn apply_schema_internal(
conn: &Connection,
schema: &Schema,
logger: &Arc<Logger>,
) -> Result<(), String> {
log_info!(logger, "Applying schema...");
let mut ddl_batch = String::new();
for table_def in &schema.tables {
let columns_ddl: Vec<String> = table_def
.columns
.iter()
.map(|col| {
let col_type_str = match col.data_type {
DataType::Integer => "INTEGER",
DataType::Real => "REAL",
DataType::Text => "TEXT",
DataType::Blob => "BLOB",
DataType::Boolean => "INTEGER", };
let mut col_ddl = format!("{} {}", col.name, col_type_str);
if col.primary_key {
col_ddl.push_str(" PRIMARY KEY");
if col.autoincrement {
col_ddl.push_str(" AUTOINCREMENT");
}
}
if col.not_null {
col_ddl.push_str(" NOT NULL");
}
col_ddl
})
.collect();
log_debug!(logger, "Preparing to create table: {}", table_def.name);
let table_ddl = format!(
"CREATE TABLE IF NOT EXISTS {} ({});\n",
table_def.name,
columns_ddl.join(", ")
);
ddl_batch.push_str(&table_ddl);
}
for index_def in &schema.indexes {
if index_def.columns.is_empty() {
log_warn!(
logger,
"Skipping index '{}' for table '{}' as it has no columns defined.",
index_def.name,
index_def.table_name
);
continue;
}
let unique_str = if index_def.unique { "UNIQUE " } else { "" };
let columns_list = index_def.columns.join(", ");
log_debug!(
logger,
"Preparing to create index: {} on table {}",
index_def.name,
index_def.table_name
);
let index_ddl = format!(
"CREATE {}INDEX IF NOT EXISTS {} ON {} ({});\n",
unique_str, index_def.name, index_def.table_name, columns_list
);
ddl_batch.push_str(&index_ddl);
}
if ddl_batch.is_empty() {
log_debug!(
logger,
"No DDL statements to execute for the provided schema."
);
return Ok(());
}
conn.execute_batch(&ddl_batch).map_err(|e| {
let err_msg = format!("Failed to apply schema batch: {e}. DDL:\n{ddl_batch}");
log_error!(logger, "{}", err_msg);
err_msg
})
}
fn execute_internal(
conn: &Connection,
sql: &str,
params: &Params, logger: &Arc<Logger>,
) -> Result<usize, String> {
log_debug!(logger, "Executing SQL: {}", sql);
let rusqlite_params_results: Result<Vec<Box<dyn ToSql + Send + Sync>>, String> =
params.values.iter().map(value_to_to_sql).collect();
match rusqlite_params_results {
Ok(rusqlite_params) => {
let params_for_iter: Vec<&(dyn rusqlite::types::ToSql + Send + Sync)> =
rusqlite_params.iter().map(|b| b.as_ref()).collect();
log_debug!(logger, "Executing SQL: {} with params: {:?}", sql, params);
conn.execute(sql, params_from_iter(params_for_iter))
.map_err(|e| {
let err_msg = format!("Failed to execute SQL '{sql}': {e}");
log_error!(logger, "{}", err_msg);
err_msg
})
}
Err(e) => Err(format!(
"Failed to convert Value params to SQL params: {e}", )),
}
}
fn query_internal(
conn: &Connection,
sql: &str,
params: &Params, logger: &Arc<Logger>,
) -> Result<Vec<HashMap<String, Value>>, String> {
let rusqlite_params_results: Result<Vec<Box<dyn ToSql + Send + Sync>>, String> =
params.values.iter().map(value_to_to_sql).collect();
let rusqlite_params = match rusqlite_params_results {
Ok(p) => p,
Err(e) => {
return Err(format!(
"Failed to convert Value params to SQL params for query: {e}", ));
}
};
let params_for_iter: Vec<&(dyn rusqlite::types::ToSql + Send + Sync)> =
rusqlite_params.iter().map(|b| b.as_ref()).collect();
log_debug!(
logger,
"Preparing SQL query: {} with params: {:?}",
sql,
params
);
let mut stmt = conn.prepare(sql).map_err(|e| {
let err_msg = format!("Error preparing statement for SQL '{sql}': {e}");
log_error!(logger, "{}", err_msg);
err_msg
})?;
let column_names: Vec<String> = stmt
.column_names()
.into_iter()
.map(|s| s.to_string())
.collect();
log_debug!(
logger,
"Executing SQL query: {} with params: {:?}",
sql,
params
);
let rows_iter = stmt
.query_map(params_from_iter(params_for_iter.into_iter()), |row| {
let mut map = HashMap::new();
for (i, name) in column_names.iter().enumerate() {
map.insert(name.clone(), value_ref_to_value(row.get_ref_unwrap(i)));
}
Ok(map) })
.map_err(|e| {
let err_msg = format!("Error executing query '{sql}': {e}");
log_error!(logger, "{}", err_msg);
err_msg
})?;
rows_iter
.collect::<RusqliteResult<Vec<HashMap<String, Value>>>>()
.map_err(|e| {
let err_msg = format!("Error collecting query results for '{sql}': {e}");
log_error!(logger, "{}", err_msg);
err_msg
})
}
fn value_to_to_sql(val: &Value) -> Result<Box<dyn ToSql + Send + Sync>, String> {
match val {
Value::Null => Ok(Box::new(Null)),
Value::Integer(i) => Ok(Box::new(*i)),
Value::Real(f) => Ok(Box::new(*f)),
Value::Text(s) => Ok(Box::new(s.clone())),
Value::Blob(b) => Ok(Box::new(b.clone())),
Value::Boolean(b) => Ok(Box::new(if *b { 1i64 } else { 0i64 })), }
}
fn value_ref_to_value(value_ref: RusqliteValueRef<'_>) -> Value {
match value_ref {
RusqliteValueRef::Null => Value::Null,
RusqliteValueRef::Integer(i) => Value::Integer(i),
RusqliteValueRef::Real(f) => Value::Real(f),
RusqliteValueRef::Text(t_bytes) => {
Value::Text(String::from_utf8_lossy(t_bytes).into_owned())
}
RusqliteValueRef::Blob(blob_bytes) => Value::Blob(blob_bytes.to_vec()),
}
}
fn internal_value_to_arc_value(value: &Value) -> ArcValue {
match value {
Value::Null => ArcValue::null(), Value::Integer(i) => ArcValue::new_primitive(*i),
Value::Real(f) => ArcValue::new_primitive(*f),
Value::Text(s) => ArcValue::new_primitive(s.clone()),
Value::Blob(b) => {
ArcValue::new_bytes(b.clone())
}
Value::Boolean(b) => ArcValue::new_primitive(*b),
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Value {
Null,
Integer(i64),
Real(f64),
Text(String),
Blob(Vec<u8>),
Boolean(bool),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SqlRow {
pub columns: HashMap<String, Value>,
}
impl ToSql for Value {
fn to_sql(&self) -> Result<ToSqlOutput<'_>, rusqlite::Error> {
match self {
Value::Null => Ok(ToSqlOutput::from(Null)),
Value::Integer(i) => Ok(ToSqlOutput::from(*i)),
Value::Real(f) => Ok(ToSqlOutput::from(*f)),
Value::Text(s) => Ok(ToSqlOutput::from(s.as_str())),
Value::Blob(b) => Ok(ToSqlOutput::from(b.as_slice())),
Value::Boolean(b) => Ok(ToSqlOutput::from(if *b { 1i64 } else { 0i64 })),
}
}
}
impl From<rusqlite::types::Value> for Value {
fn from(db_value: rusqlite::types::Value) -> Self {
match db_value {
rusqlite::types::Value::Null => Value::Null,
rusqlite::types::Value::Integer(i) => Value::Integer(i),
rusqlite::types::Value::Real(f) => Value::Real(f),
rusqlite::types::Value::Text(s) => Value::Text(s),
rusqlite::types::Value::Blob(b) => Value::Blob(b),
}
}
}
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct Params {
pub values: Vec<Value>,
}
impl Params {
pub fn new() -> Self {
Self::default()
}
pub fn with_value(mut self, value: impl Into<Value>) -> Self {
self.values.push(value.into());
self
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Plain)]
pub struct SqlQuery {
pub statement: String,
pub params: Params,
}
impl SqlQuery {
pub fn new(statement: &str) -> Self {
Self {
statement: statement.to_string(),
params: Params::new(),
}
}
pub fn with_params(mut self, params: Params) -> Self {
self.params = params;
self
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum QueryOperator {
Equal(Value),
NotEqual(Value),
GreaterThan(Value),
GreaterThanOrEqual(Value),
LessThan(Value),
LessThanOrEqual(Value),
Like(String),
In(Vec<Value>),
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct Query {
pub conditions: HashMap<String, QueryOperator>,
}
impl Query {
pub fn new() -> Self {
Self::default()
}
pub fn with_condition(mut self, field: &str, op: QueryOperator) -> Self {
self.conditions.insert(field.to_string(), op);
self
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CreateOperation {
pub table: String,
pub data: HashMap<String, Value>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ReadOperation {
pub table: String,
pub query: Query,
pub fields: Option<Vec<String>>,
pub limit: Option<u32>,
pub offset: Option<u32>,
pub order_by: Option<Vec<(String, bool)>>, }
#[derive(Debug, Clone, PartialEq)]
pub struct UpdateOperation {
pub table: String,
pub query: Query,
pub updates: HashMap<String, Value>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DeleteOperation {
pub table: String,
pub query: Query,
}
#[derive(Debug, Clone, PartialEq)]
pub enum CrudOperation {
Create(CreateOperation),
Read(ReadOperation),
Update(UpdateOperation),
Delete(DeleteOperation),
}
#[derive(Clone, Debug, Serialize, Deserialize)] pub struct SqliteConfig {
pub db_path: String,
pub schema: Schema,
pub encryption: bool,
pub replication: Option<crate::replication::ReplicationConfig>,
}
impl SqliteConfig {
pub fn new(db_path: &str, schema: Schema, encryption: bool) -> Self {
Self {
db_path: db_path.to_string(),
schema,
encryption,
replication: None,
}
}
pub fn with_replication(mut self, replication: crate::replication::ReplicationConfig) -> Self {
self.replication = Some(replication);
self
}
}
pub struct SqliteService {
name: String,
path: String,
version: String,
description: String,
config: SqliteConfig,
worker_tx: Arc<RwLock<Option<mpsc::Sender<SqliteWorkerCommand>>>>,
network_id: Option<String>,
replication_manager: Arc<RwLock<Option<Arc<crate::replication::ReplicationManager>>>>,
}
impl Clone for SqliteService {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
path: self.path.clone(),
version: self.version.clone(),
description: self.description.clone(),
config: self.config.clone(),
worker_tx: self.worker_tx.clone(),
network_id: self.network_id.clone(),
replication_manager: self.replication_manager.clone(),
}
}
}
impl SqliteService {
pub fn new(name: &str, path: &str, config: SqliteConfig) -> Self {
Self {
name: name.to_string(),
path: path.to_string(),
version: "0.0.1".to_string(),
description: "SQLite service".to_string(),
config,
worker_tx: Arc::new(RwLock::new(None)),
network_id: None,
replication_manager: Arc::new(RwLock::new(None)),
}
}
pub async fn next_origin_seq(&self, table: &str) -> Result<i64, String> {
let key = format!("origin_seq::{table}");
let _ = self
.send_command(|reply_tx| SqliteWorkerCommand::Execute {
query: SqlQuery::new("INSERT INTO replication_meta (key, value) VALUES (?, '0') ON CONFLICT(key) DO NOTHING")
.with_params(Params::new().with_value(Value::Text(key.clone()))),
reply_to: reply_tx,
})
.await?;
let _ = self
.send_command(|reply_tx| SqliteWorkerCommand::Execute {
query: SqlQuery::new("UPDATE replication_meta SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT) WHERE key = ?")
.with_params(Params::new().with_value(Value::Text(key.clone()))),
reply_to: reply_tx,
})
.await?;
let rows = self
.send_command(|reply_tx| SqliteWorkerCommand::Query {
query: SqlQuery::new("SELECT value FROM replication_meta WHERE key = ?")
.with_params(Params::new().with_value(Value::Text(key))),
reply_to: reply_tx,
})
.await?;
if let Some(first) = rows.first() {
if let Some(Value::Text(s)) = first.get("value") {
if let Ok(parsed) = s.parse::<i64>() {
return Ok(parsed);
}
}
}
Err("Failed to read next origin sequence".to_string())
}
pub async fn send_command<T: Send + 'static>(
&self,
constructor: impl FnOnce(oneshot::Sender<Result<T, String>>) -> SqliteWorkerCommand,
) -> Result<T, String> {
let maybe_sender = {
let guard = self
.worker_tx
.read()
.map_err(|e| format!("Failed to acquire read lock on worker_tx: {e}"))?;
guard.clone()
};
if let Some(cloned_sender) = maybe_sender {
let (reply_tx, reply_rx) = oneshot::channel();
let command = constructor(reply_tx);
if cloned_sender.is_closed() {
return Err("SqliteWorker channel is closed. Worker may have panicked.".to_string());
}
cloned_sender .send(command)
.await
.map_err(|e| format!("Failed to send command to SqliteWorker: {e}"))?;
match reply_rx.await {
Ok(result) => result,
Err(e) => Err(format!("Failed to receive reply from SqliteWorker: {e}")),
}
} else {
Err(
"SqliteWorker sender is not initialized. Service may not have been started."
.to_string(),
)
}
}
async fn apply_schema(&self, schema: Schema, context: &LifecycleContext) -> Result<()> {
let schema_to_apply = schema; context.info(format!(
"Attempting to apply schema for SqliteService: {}",
self.name
));
match self
.send_command(|reply_tx| SqliteWorkerCommand::ApplySchema {
schema: schema_to_apply.clone(), reply_to: reply_tx,
})
.await
{
Ok(_) => {
context.info(format!("Schema application command successfully processed by worker for SqliteService: {}", self.name));
Ok(())
}
Err(e) => {
let err_msg = format!(
"Failed to apply schema for SqliteService '{}': {}",
self.name, e
);
context.error(err_msg.clone());
Err(anyhow!(err_msg))
}
}
}
}
#[async_trait]
impl AbstractService for SqliteService {
fn name(&self) -> &str {
&self.name
}
fn version(&self) -> &str {
&self.version
}
fn path(&self) -> &str {
&self.path
}
fn description(&self) -> &str {
&self.description
}
fn network_id(&self) -> Option<String> {
self.network_id.clone()
}
fn set_network_id(&mut self, network_id: String) {
self.network_id = Some(network_id);
}
async fn init(&self, context: LifecycleContext) -> Result<()> {
context.info(format!("Initializing SqliteService: {}", self.name));
let service_arc = Arc::new(self.clone());
let execute_query_handler = {
let s_arc = service_arc.clone();
Arc::new(
move |params_opt: Option<ArcValue>, req_ctx: RequestContext| {
let service_clone = s_arc.clone();
Box::pin(async move {
let query_arc_value = params_opt .ok_or_else(|| anyhow!("Missing payload for 'execute_query' action. Expected ArcValue wrapping SqlQuery."))?;
let sql_query_struct = query_arc_value.as_type_ref::<SqlQuery>()
.map_err(|original_error_from_as_type| {
anyhow!(format!(
"Invalid payload type for 'execute_query'. Expected SqlQuery, got {:?}. Original error: {:?}",
query_arc_value.category(), original_error_from_as_type
))
})?;
let sql_statement = sql_query_struct.statement.clone(); let query_to_send = sql_query_struct.as_ref().clone();
let trimmed_sql = sql_statement.trim_start().to_uppercase();
if trimmed_sql.starts_with("SELECT") {
let internal_results: Vec<HashMap<String, Value>> = service_clone
.send_command(|reply_tx| SqliteWorkerCommand::Query {
query: query_to_send,
reply_to: reply_tx,
})
.await
.map_err(|e: String| anyhow!(e))?;
let arc_results: Vec<HashMap<String, ArcValue>> = internal_results
.into_iter()
.map(|hmap| {
hmap.into_iter()
.map(|(k, v_internal)| {
(k, internal_value_to_arc_value(&v_internal))
})
.collect::<HashMap<String, ArcValue>>()
})
.collect();
let result_list: Vec<ArcValue> = arc_results
.into_iter()
.map(|hmap_arc| ArcValue::new_map(hmap_arc.into_iter().collect())) .collect();
Ok(ArcValue::new_list(result_list))
} else {
let affected_rows: usize = service_clone
.send_command(|reply_tx| SqliteWorkerCommand::Execute {
query: query_to_send,
reply_to: reply_tx,
})
.await
.map_err(|e: String| anyhow!(e))?;
if let Some(replication_config) = &service_clone.config.replication {
if let Some(table_name) = extract_table_name(&sql_statement) {
let table_name = table_name.clone();
if replication_config.enabled_tables.contains(&table_name) {
let event = crate::replication::SqliteEvent {
operation: determine_operation_type(&trimmed_sql)
.to_lowercase(),
table: table_name.clone(),
data: query_arc_value.clone(),
timestamp: SystemTime::now(),
origin_node_id: req_ctx.logger.node_id().to_string(),
origin_seq: service_clone
.next_origin_seq(&table_name)
.await
.unwrap_or(0),
};
let service_path = service_clone.path.clone();
let event_path = format!(
"{}/{}/{}",
service_path, table_name, event.operation
);
req_ctx.info(format!(
"📤 Publishing SQLite event: path={}, table={}, operation={}",
event_path, table_name, event.operation
));
req_ctx
.publish(&event_path, Some(ArcValue::new_struct(event)))
.await?;
req_ctx.info("✅ SQLite event published successfully");
} else {
req_ctx.debug(format!(
"⏭️ Skipping event for table '{}' - not in enabled_tables: {:?}",
table_name, replication_config.enabled_tables
));
}
} else {
req_ctx.debug(
"⏭️ Could not extract table name from SQL statement",
);
}
}
Ok(ArcValue::new_primitive(affected_rows as i64))
}
}) as ServiceFuture },
)
};
context
.register_action(EXECUTE_QUERY_ACTION, execute_query_handler)
.await?;
context.info(format!(
"'{}' action registered for SqliteService: {}",
EXECUTE_QUERY_ACTION, self.name
));
context.info(format!(
"Checking replication config for service {}: {:?}",
self.name,
self.config.replication.is_some()
));
if let Some(replication_config) = &self.config.replication {
context.info("Initializing replication manager...");
let node_id = self
.network_id
.clone()
.ok_or_else(|| anyhow!("Network ID is required for replication"))?;
let replication_manager = Arc::new(crate::replication::ReplicationManager::new(
Arc::new(self.clone()),
replication_config.clone(),
context.logger.clone(),
node_id,
));
let get_table_events_handler = {
let replication_manager_clone = replication_manager.clone();
Arc::new(
move |params_opt: Option<ArcValue>, _req_ctx: RequestContext| {
let replication_manager_clone = replication_manager_clone.clone();
Box::pin(async move {
if let Some(request_data) = params_opt {
let request = request_data
.as_type_ref::<crate::replication::TableEventsRequest>()?;
let response =
replication_manager_clone.get_table_events(request).await?;
Ok(ArcValue::new_struct(response))
} else {
Err(anyhow!("No request data provided"))
}
}) as ServiceFuture
},
)
};
context
.register_action(
REPLICATION_GET_TABLE_EVENTS_ACTION,
get_table_events_handler,
)
.await?;
{
let mut manager_guard = self.replication_manager.write().map_err(|e| {
anyhow!("Failed to acquire write lock on replication_manager: {e}")
})?;
*manager_guard = Some(replication_manager.clone());
}
for table in &self.config.replication.as_ref().unwrap().enabled_tables {
let create_path = format!("{}/{}/create", self.path, table);
let update_path = format!("{}/{}/update", self.path, table);
let delete_path = format!("{}/{}/delete", self.path, table);
let event_handler = {
let replication_manager_clone = replication_manager.clone(); Arc::new(move |ctx: Arc<EventContext>, event: Option<ArcValue>| {
let replication_manager_clone = replication_manager_clone.clone(); Box::pin(async move {
ctx.info("🎯 Event handler triggered");
if let Some(event_data) = event {
ctx.info("📦 Event data received");
let sqlite_event = (*event_data
.as_type_ref::<crate::replication::SqliteEvent>()?)
.clone();
let is_local = ctx.is_local();
ctx.info(format!(
"🔄 Processing SQLite event: table={}, operation={}, is_local={}",
sqlite_event.table, sqlite_event.operation, is_local
));
replication_manager_clone
.handle_sqlite_event(sqlite_event, is_local)
.await?;
ctx.info("✅ Event processing completed");
} else {
ctx.debug("📭 No event data provided");
}
Ok(())
})
as Pin<Box<dyn Future<Output = Result<()>> + Send>>
})
};
context
.subscribe(&create_path, event_handler.clone(), None)
.await?;
context
.subscribe(&update_path, event_handler.clone(), None)
.await?;
context.subscribe(&delete_path, event_handler, None).await?;
}
}
context.info("Event handlers registered for replication");
Ok(())
}
async fn start(&self, context: LifecycleContext) -> Result<()> {
let service_arc = Arc::new(self.clone());
context.info(format!(
"SqliteService '{}' starting worker and applying schema.",
self.name
));
let (tx, rx) = mpsc::channel(32);
let (ready_tx, ready_rx) = oneshot::channel();
{
let mut worker_tx_guard = self
.worker_tx
.write()
.map_err(|e| anyhow!("Failed to acquire write lock on worker_tx: {}", e))?;
*worker_tx_guard = Some(tx);
}
let db_path_clone = self.config.db_path.clone();
let schema_clone = self.config.schema.clone();
let logger_clone_for_thread = context.logger.clone();
let mut encryption_key: Option<Vec<u8>> = None;
if self.config.encryption {
context.info("SqliteService encryption enabled - requesting symmetric key.");
let key_name = format!(
"sqlite_{}_{}_{}",
self.path,
self.version,
self.network_id.as_ref().expect("network_id is required")
);
let key_arc = context
.request(
"$keys/ensure_symmetric_key",
Some(ArcValue::new_primitive(key_name)),
)
.await?;
let key = key_arc.as_type_ref::<Vec<u8>>()?;
encryption_key = Some(key.as_ref().clone());
} else {
context.warn("SqliteService encryption disabled.");
}
thread::spawn(move || {
let worker_runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime for SqliteWorker");
worker_runtime.block_on(async move {
match SqliteWorker::new(
db_path_clone,
rx,
logger_clone_for_thread.clone(),
ready_tx, encryption_key,
) {
Ok(worker) => {
logger_clone_for_thread.info("SqliteWorker thread starting run loop.");
worker.run().await;
logger_clone_for_thread.info("SqliteWorker thread finished.");
}
Err(e) => {
logger_clone_for_thread
.error(format!("Failed to initialize SqliteWorker in thread: {e}",));
}
}
});
});
ready_rx
.await
.map_err(|e| anyhow!("SqliteWorker failed to start: {}", e))?;
context.debug("SqliteWorker has signaled it is ready.");
self.apply_schema(schema_clone, &context).await?;
if let Some(replication_config) = &self.config.replication {
context.info("Starting replication manager...");
if let Some(replication_manager) = {
let manager_guard = service_arc.replication_manager.read().map_err(|e| {
anyhow!("Failed to acquire read lock on replication_manager: {e}")
})?;
manager_guard.clone()
} {
replication_manager.create_event_tables(&context).await?;
if replication_config.startup_sync {
context.info("Starting replication synchronization...");
replication_manager
.sync_on_startup(&context, replication_config)
.await?;
context.info("Replication synchronization completed");
}
}
}
context.info(format!(
"SqliteService '{}' started successfully.",
self.name
));
Ok(())
}
async fn stop(&self, context: LifecycleContext) -> Result<()> {
context.info(format!("Stopping SqliteService: {}", self.name));
match self.send_command(|reply_tx| SqliteWorkerCommand::Shutdown { reply_to: reply_tx }).await {
Ok(_) => context.info(format!("SqliteService '{}' worker acknowledged shutdown.", self.name)),
Err(e) => context.error(format!("Error sending Shutdown to SqliteService '{}' worker: {e}. Worker might have already terminated.", self.name)),
}
Ok(())
}
}
pub fn extract_table_name(sql: &str) -> Option<String> {
let sql_upper = sql.trim_start().to_uppercase();
if let Some(after_insert) = sql_upper.strip_prefix("INSERT INTO ") {
if let Some(space_pos) = after_insert.find(' ') {
return Some(after_insert[..space_pos].to_lowercase());
}
} else if let Some(after_update) = sql_upper.strip_prefix("UPDATE ") {
if let Some(space_pos) = after_update.find(' ') {
return Some(after_update[..space_pos].to_lowercase());
}
} else if let Some(after_delete) = sql_upper.strip_prefix("DELETE FROM ") {
if let Some(space_pos) = after_delete.find(' ') {
return Some(after_delete[..space_pos].to_lowercase());
}
}
None
}
pub fn determine_operation_type(sql: &str) -> String {
let sql_upper = sql.trim_start().to_uppercase();
if sql_upper.starts_with("INSERT") {
"CREATE".to_string()
} else if sql_upper.starts_with("UPDATE") {
"UPDATE".to_string()
} else if sql_upper.starts_with("DELETE") {
"DELETE".to_string()
} else {
"OTHER".to_string()
}
}