use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use serde::{Deserialize, Serialize};
use crate::{database_sync_operations::MutexSlot, errors::Error, Result};
pub struct Coro<Ctx> {
pub ctx: Mutex<Ctx>,
gen: genawaiter::sync::Co<SyncEngineIoResult, Result<Ctx>>,
}
impl<Ctx> Coro<Ctx> {
pub fn new(ctx: Ctx, gen: genawaiter::sync::Co<SyncEngineIoResult, Result<Ctx>>) -> Self {
Self {
ctx: Mutex::new(ctx),
gen,
}
}
pub async fn yield_(&self, value: SyncEngineIoResult) -> Result<()> {
let ctx = self.gen.yield_(value).await?;
*self.ctx.lock().unwrap() = ctx;
Ok(())
}
}
impl From<genawaiter::sync::Co<SyncEngineIoResult, Result<()>>> for Coro<()> {
fn from(value: genawaiter::sync::Co<SyncEngineIoResult, Result<()>>) -> Self {
Self {
gen: value,
ctx: Mutex::new(()),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum PartialBootstrapStrategy {
Prefix { length: usize },
Query { query: String },
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PartialSyncOpts {
pub bootstrap_strategy: Option<PartialBootstrapStrategy>,
pub segment_size: usize,
pub prefetch: bool,
}
impl PartialSyncOpts {
pub fn segment_size(&self) -> usize {
if self.segment_size == 0 {
128 * 1024
} else {
self.segment_size
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct DbSyncInfo {
pub current_generation: u64,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct DbSyncStatus {
pub baton: Option<String>,
pub status: String,
pub generation: u64,
pub max_frame_no: u64,
}
pub struct DbChangesStatus {
pub time: turso_core::WallClockInstant,
pub revision: DatabasePullRevision,
pub file_slot: Option<MutexSlot<Arc<dyn turso_core::File>>>,
}
impl std::fmt::Debug for DbChangesStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DbChangesStatus")
.field("time", &self.time)
.field("revision", &self.revision)
.field("file_slot.is_some()", &self.file_slot.is_some())
.finish()
}
}
#[derive(Debug, Serialize)]
pub struct SyncEngineStats {
pub cdc_operations: i64,
pub main_wal_size: u64,
pub revert_wal_size: u64,
pub last_pull_unix_time: Option<i64>,
pub last_push_unix_time: Option<i64>,
pub revision: Option<String>,
pub network_sent_bytes: usize,
pub network_received_bytes: usize,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DatabaseChangeType {
Delete,
Update,
Insert,
Commit,
}
pub const DATABASE_METADATA_VERSION: &str = "v1";
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct DatabaseMetadata {
pub version: String,
pub client_unique_id: String,
pub synced_revision: Option<DatabasePullRevision>,
pub revert_since_wal_salt: Option<Vec<u32>>,
pub revert_since_wal_watermark: u64,
pub last_pull_unix_time: Option<i64>,
pub last_push_unix_time: Option<i64>,
pub last_pushed_pull_gen_hint: i64,
pub last_pushed_change_id_hint: i64,
pub partial_bootstrap_server_revision: Option<DatabasePullRevision>,
pub saved_configuration: Option<DatabaseSavedConfiguration>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct DatabaseSavedConfiguration {
pub remote_url: Option<String>,
pub partial_sync_prefetch: Option<bool>,
pub partial_sync_segment_size: Option<usize>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum DatabasePullRevision {
Legacy {
generation: u64,
synced_frame_no: Option<u64>,
},
V1 {
revision: String,
},
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub enum DatabaseSyncEngineProtocolVersion {
Legacy,
V1,
}
impl DatabaseMetadata {
pub fn remote_url(&self) -> Option<String> {
self.saved_configuration
.as_ref()
.and_then(|x| x.remote_url.as_deref())
.map(|x| x.to_string())
}
pub fn partial_sync_opts(&self) -> Option<PartialSyncOpts> {
if self.partial_bootstrap_server_revision.is_none() {
None
} else {
let partial_sync_opts = PartialSyncOpts {
bootstrap_strategy: None,
segment_size: self
.saved_configuration
.as_ref()
.and_then(|x| x.partial_sync_segment_size)
.unwrap_or(128 * 1024),
prefetch: self
.saved_configuration
.as_ref()
.and_then(|x| x.partial_sync_prefetch)
.unwrap_or_default(),
};
Some(partial_sync_opts)
}
}
pub fn update_configuration(&mut self, configuration: DatabaseSavedConfiguration) -> bool {
let Some(saved_configuration) = &mut self.saved_configuration else {
self.saved_configuration = Some(configuration);
return true;
};
let mut changed = false;
if let Some(remote_url) = configuration.remote_url {
saved_configuration.remote_url = Some(remote_url);
changed |= true;
}
if let Some(partial_sync_prefetch) = configuration.partial_sync_prefetch {
saved_configuration.partial_sync_prefetch = Some(partial_sync_prefetch);
changed |= true;
}
if let Some(partial_sync_segment_size) = configuration.partial_sync_segment_size {
saved_configuration.partial_sync_segment_size = Some(partial_sync_segment_size);
changed |= true;
}
changed
}
pub fn load(data: &[u8]) -> Result<Self> {
let value: serde_json::Value = serde_json::from_slice(data)?;
match value.get("version").and_then(serde_json::Value::as_str) {
Some(version) => {
let version = version.to_string();
let meta: DatabaseMetadata = serde_json::from_value(value).map_err(|err|
Error::JsonDecode(format!("unable to parse metadata file with version {version}: {err}"))
)?;
Ok(meta)
}
None => Err(Error::JsonDecode(
"unexpected metadata file format, 'version' field must be present and have string type".to_string(),
)),
}
}
pub fn dump(&self) -> Result<Vec<u8>> {
let data = serde_json::to_string(self)?;
Ok(data.into_bytes())
}
}
#[derive(Clone)]
pub struct DatabaseChange {
pub change_id: i64,
pub change_time: u64,
pub change_txn_id: Option<i64>,
pub change_type: DatabaseChangeType,
pub table_name: String,
pub id: i64,
pub before: Option<Vec<u8>>,
pub after: Option<Vec<u8>>,
pub updates: Option<Vec<u8>>,
}
impl DatabaseChange {
pub fn into_apply(self) -> Result<DatabaseTapeRowChange> {
let tape_change = match self.change_type {
DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Delete {
before: parse_bin_record(self.before.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'before'".to_string(),
)
})?)?,
},
DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update {
before: parse_bin_record(self.before.ok_or_else(|| {
Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string())
})?)?,
after: parse_bin_record(self.after.ok_or_else(|| {
Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string())
})?)?,
updates: if let Some(updates) = self.updates {
Some(parse_bin_record(updates)?)
} else {
None
},
},
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Insert {
after: parse_bin_record(self.after.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'after'".to_string(),
)
})?)?,
},
DatabaseChangeType::Commit => {
return Err(Error::DatabaseTapeError(
"Commit changes cannot be converted to row changes".to_string(),
))
}
};
Ok(DatabaseTapeRowChange {
change_id: self.change_id,
change_time: self.change_time,
change: tape_change,
table_name: self.table_name,
id: self.id,
})
}
pub fn into_revert(self) -> Result<DatabaseTapeRowChange> {
let tape_change = match self.change_type {
DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Insert {
after: parse_bin_record(self.before.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'before'".to_string(),
)
})?)?,
},
DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update {
before: parse_bin_record(self.after.ok_or_else(|| {
Error::DatabaseTapeError("cdc_mode must be set to 'full'".to_string())
})?)?,
after: parse_bin_record(self.before.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'before'".to_string(),
)
})?)?,
updates: None,
},
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Delete {
before: parse_bin_record(self.after.ok_or_else(|| {
Error::DatabaseTapeError(
"cdc_mode must be set to either 'full' or 'after'".to_string(),
)
})?)?,
},
DatabaseChangeType::Commit => {
return Err(Error::DatabaseTapeError(
"Commit changes cannot be converted to row changes".to_string(),
))
}
};
Ok(DatabaseTapeRowChange {
change_id: self.change_id,
change_time: self.change_time,
change: tape_change,
table_name: self.table_name,
id: self.id,
})
}
}
impl std::fmt::Debug for DatabaseChange {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DatabaseChange")
.field("change_id", &self.change_id)
.field("change_time", &self.change_time)
.field("change_txn_id", &self.change_txn_id)
.field("change_type", &self.change_type)
.field("table_name", &self.table_name)
.field("id", &self.id)
.field("before.len()", &self.before.as_ref().map(|x| x.len()))
.field("after.len()", &self.after.as_ref().map(|x| x.len()))
.finish()
}
}
impl TryFrom<&turso_core::Row> for DatabaseChange {
type Error = Error;
fn try_from(row: &turso_core::Row) -> Result<Self> {
Self::from_row_v1(row)
}
}
fn parse_change_type(value: i64) -> Result<DatabaseChangeType> {
match value {
-1 => Ok(DatabaseChangeType::Delete),
0 => Ok(DatabaseChangeType::Update),
1 => Ok(DatabaseChangeType::Insert),
2 => Ok(DatabaseChangeType::Commit),
v => Err(Error::DatabaseTapeError(format!(
"unexpected change type: expected -1|0|1|2, got '{v:?}'"
))),
}
}
impl DatabaseChange {
pub fn from_row_v1(row: &turso_core::Row) -> Result<Self> {
let change_id = get_core_value_i64(row, 0)?;
let change_time = get_core_value_i64(row, 1)? as u64;
let change_type = get_core_value_i64(row, 2)?;
let table_name = get_core_value_text(row, 3)?;
let id = get_core_value_i64(row, 4)?;
let before = get_core_value_blob_or_null(row, 5)?;
let after = get_core_value_blob_or_null(row, 6)?;
let updates = get_core_value_blob_or_null(row, 7)?;
let change_type = parse_change_type(change_type)?;
Ok(Self {
change_id,
change_time,
change_txn_id: None,
change_type,
table_name,
id,
before,
after,
updates,
})
}
pub fn from_row_v2(row: &turso_core::Row) -> Result<Self> {
let change_id = get_core_value_i64(row, 0)?;
let change_time = get_core_value_i64(row, 1)? as u64;
let change_txn_id = get_core_value_i64_or_null(row, 2)?;
let change_type = get_core_value_i64(row, 3)?;
let change_type = parse_change_type(change_type)?;
let table_name = get_core_value_text_or_null(row, 4)?.unwrap_or_default();
let id = get_core_value_i64_or_null(row, 5)?.unwrap_or(0);
let before = get_core_value_blob_or_null(row, 6)?;
let after = get_core_value_blob_or_null(row, 7)?;
let updates = get_core_value_blob_or_null(row, 8)?;
Ok(Self {
change_id,
change_time,
change_txn_id,
change_type,
table_name,
id,
before,
after,
updates,
})
}
pub fn from_row(row: &turso_core::Row, cdc_version: turso_core::CdcVersion) -> Result<Self> {
match cdc_version {
turso_core::CdcVersion::V2 => Self::from_row_v2(row),
turso_core::CdcVersion::V1 => Self::from_row_v1(row),
}
}
}
pub struct DatabaseRowMutation {
pub change_time: u64,
pub table_name: String,
pub id: i64,
pub change_type: DatabaseChangeType,
pub before: Option<HashMap<String, turso_core::Value>>,
pub after: Option<HashMap<String, turso_core::Value>>,
pub updates: Option<HashMap<String, turso_core::Value>>,
}
#[derive(Debug, Clone)]
pub struct DatabaseStatementReplay {
pub sql: String,
pub values: Vec<turso_core::Value>,
}
#[derive(Debug, Clone)]
pub enum DatabaseRowTransformResult {
Keep,
Skip,
Rewrite(DatabaseStatementReplay),
}
#[derive(Clone)]
pub enum DatabaseTapeRowChangeType {
Delete {
before: Vec<turso_core::Value>,
},
Update {
before: Vec<turso_core::Value>,
after: Vec<turso_core::Value>,
updates: Option<Vec<turso_core::Value>>,
},
Insert {
after: Vec<turso_core::Value>,
},
}
impl From<&DatabaseTapeRowChangeType> for DatabaseChangeType {
fn from(value: &DatabaseTapeRowChangeType) -> Self {
match value {
DatabaseTapeRowChangeType::Delete { .. } => DatabaseChangeType::Delete,
DatabaseTapeRowChangeType::Update { .. } => DatabaseChangeType::Update,
DatabaseTapeRowChangeType::Insert { .. } => DatabaseChangeType::Insert,
}
}
}
#[derive(Debug)]
pub enum DatabaseTapeOperation {
StmtReplay(DatabaseStatementReplay),
RowChange(DatabaseTapeRowChange),
Commit,
}
#[derive(Debug, Clone)]
pub struct DatabaseTapeRowChange {
pub change_id: i64,
pub change_time: u64,
pub change: DatabaseTapeRowChangeType,
pub table_name: String,
pub id: i64,
}
impl std::fmt::Debug for DatabaseTapeRowChangeType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Delete { before } => f
.debug_struct("Delete")
.field("before.len()", &before.len())
.finish(),
Self::Update {
before,
after,
updates,
} => f
.debug_struct("Update")
.field("before.len()", &before.len())
.field("after.len()", &after.len())
.field("updates.len()", &updates.as_ref().map(|x| x.len()))
.finish(),
Self::Insert { after } => f
.debug_struct("Insert")
.field("after.len()", &after.len())
.finish(),
}
}
}
fn get_core_value_i64(row: &turso_core::Row, index: usize) -> Result<i64> {
match row.get_value(index) {
turso_core::Value::Numeric(turso_core::Numeric::Integer(v)) => Ok(*v),
v => Err(Error::DatabaseTapeError(format!(
"column {index} type mismatch: expected integer, got '{v:?}'"
))),
}
}
fn get_core_value_i64_or_null(row: &turso_core::Row, index: usize) -> Result<Option<i64>> {
match row.get_value(index) {
turso_core::Value::Numeric(turso_core::Numeric::Integer(v)) => Ok(Some(*v)),
turso_core::Value::Null => Ok(None),
v => Err(Error::DatabaseTapeError(format!(
"column {index} type mismatch: expected integer or null, got '{v:?}'"
))),
}
}
fn get_core_value_text(row: &turso_core::Row, index: usize) -> Result<String> {
match row.get_value(index) {
turso_core::Value::Text(x) => Ok(x.to_string()),
v => Err(Error::DatabaseTapeError(format!(
"column {index} type mismatch: expected string, got '{v:?}'"
))),
}
}
fn get_core_value_text_or_null(row: &turso_core::Row, index: usize) -> Result<Option<String>> {
match row.get_value(index) {
turso_core::Value::Text(x) => Ok(Some(x.to_string())),
turso_core::Value::Null => Ok(None),
v => Err(Error::DatabaseTapeError(format!(
"column {index} type mismatch: expected string or null, got '{v:?}'"
))),
}
}
fn get_core_value_blob_or_null(row: &turso_core::Row, index: usize) -> Result<Option<Vec<u8>>> {
match row.get_value(index) {
turso_core::Value::Null => Ok(None),
turso_core::Value::Blob(x) => Ok(Some(x.clone())),
v => Err(Error::DatabaseTapeError(format!(
"column {index} type mismatch: expected blob, got '{v:?}'"
))),
}
}
pub enum SyncEngineIoResult {
IO,
}
pub fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso_core::Value>> {
match turso_core::types::ImmutableRecord::from_bin_record(bin_record).get_values_owned() {
Ok(values) => Ok(values),
Err(err) => Err(Error::DatabaseTapeError(format!(
"unable to parse bin record: {err}"
))),
}
}