Skip to main content

teaql_provider_sqlite/
lib.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::future::Future;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
8use rusqlite::types::{Value as SqliteValue, ValueRef};
9use rusqlite::{Connection, Row, params_from_iter};
10use rust_decimal::Decimal;
11use teaql_core::{
12    DataType, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record, SelectQuery,
13    UpdateCommand, Value,
14};
15use teaql_runtime::{
16    RawAuditEvent, GraphNode, InternalIdGenerator,
17    RuntimeError, SchemaProvider, UserContext,
18};
19use teaql_sql::{
20    CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect,
21    SqlTransport, quote_identifier_if_needed,
22};
23
24pub const DEFAULT_ID_SPACE_TABLE: &str = "teaql_id_space";
25
26const SQLITE_DECIMAL_PREFIX: &str = "__teaql_decimal__:";
27
28#[derive(Debug, Default, Clone, Copy)]
29pub struct SqliteDialect;
30
31impl SqlDialect for SqliteDialect {
32    fn kind(&self) -> DatabaseKind {
33        DatabaseKind::Sqlite
34    }
35
36    fn quote_ident(&self, ident: &str) -> String {
37        quote_ident(ident)
38    }
39
40    fn placeholder(&self, _index: usize) -> String {
41        "?".to_owned()
42    }
43
44    fn schema_type_sql(
45        &self,
46        data_type: DataType,
47        property: &PropertyDescriptor,
48    ) -> Result<&'static str, SqlCompileError> {
49        match data_type {
50            DataType::Bool => Ok("INTEGER"),
51            DataType::I64 | DataType::U64 if property.is_id => Ok("INTEGER"),
52            DataType::I64 | DataType::U64 => Ok("INTEGER"),
53            DataType::F64 => Ok("REAL"),
54            DataType::Decimal => Ok("NUMERIC"),
55            DataType::Text => Ok("TEXT"),
56            DataType::Json => Ok("JSON"),
57            DataType::Date => Ok("DATE"),
58            DataType::Timestamp => Ok("TIMESTAMP"),
59        }
60    }
61
62    fn compile_add_column(
63        &self,
64        entity: &EntityDescriptor,
65        property: &PropertyDescriptor,
66    ) -> Result<String, SqlCompileError> {
67        // SQLite does not support adding NOT NULL columns without a DEFAULT.
68        // Since TeaQL enforces nullability at the application layer, we can safely
69        // strip the NOT NULL constraint when adding columns to existing tables.
70        let def = self.column_definition_sql(property)?;
71        let def_without_not_null = def.replace(" NOT NULL", "");
72        
73        Ok(format!(
74            "ALTER TABLE {} ADD COLUMN {}",
75            self.quote_ident(&entity.table_name),
76            def_without_not_null
77        ))
78    }
79}
80
81#[derive(Debug)]
82pub enum MutationExecutorError {
83    Sqlite(rusqlite::Error),
84    SqlCompile(SqlCompileError),
85    UnsupportedValue(&'static str),
86    UnsupportedColumnType(String),
87    Bind(String),
88    Lock(String),
89}
90
91impl std::fmt::Display for MutationExecutorError {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        match self {
94            Self::Sqlite(err) => err.fmt(f),
95            Self::SqlCompile(err) => err.fmt(f),
96            Self::UnsupportedValue(kind) => {
97                write!(
98                    f,
99                    "unsupported rusqlite bind value for mutation executor: {kind}"
100                )
101            }
102            Self::UnsupportedColumnType(kind) => {
103                write!(
104                    f,
105                    "unsupported rusqlite column type for record decoding: {kind}"
106                )
107            }
108            Self::Bind(message) => write!(f, "rusqlite bind error: {message}"),
109            Self::Lock(message) => write!(f, "rusqlite connection lock error: {message}"),
110        }
111    }
112}
113
114impl std::error::Error for MutationExecutorError {}
115
116impl From<rusqlite::Error> for MutationExecutorError {
117    fn from(value: rusqlite::Error) -> Self {
118        Self::Sqlite(value)
119    }
120}
121
122impl From<SqlCompileError> for MutationExecutorError {
123    fn from(value: SqlCompileError) -> Self {
124        Self::SqlCompile(value)
125    }
126}
127
128#[derive(Clone)]
129pub struct SqliteMutationExecutor {
130    connection: Arc<Mutex<Connection>>,
131}
132
133impl SqliteMutationExecutor {
134    pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
135        Self { connection }
136    }
137
138    pub fn from_connection(connection: Connection) -> Self {
139        Self::new(Arc::new(Mutex::new(connection)))
140    }
141
142    pub fn connection(&self) -> Arc<Mutex<Connection>> {
143        Arc::clone(&self.connection)
144    }
145
146    pub fn ensure_schema(
147        &self,
148        dialect: &SqliteDialect,
149        entities: &[&EntityDescriptor],
150    ) -> Result<(), MutationExecutorError> {
151        self.ensure_id_space_table(DEFAULT_ID_SPACE_TABLE)?;
152
153        for entity in entities {
154            if !self.table_exists(&entity.table_name)? {
155                let sql = dialect.compile_create_table(entity)?;
156                self.lock()?.execute(&sql, [])?;
157                continue;
158            }
159
160            let existing_columns = self.table_columns(&entity.table_name)?;
161            for property in &entity.properties {
162                let bare_column = strip_identifier_quotes(&property.column_name).to_lowercase();
163                if existing_columns.contains(&bare_column) {
164                    continue;
165                }
166                let sql = dialect.compile_add_column(entity, property)?;
167                self.lock()?.execute(&sql, [])?;
168            }
169        }
170        Ok(())
171    }
172
173    pub fn ensure_id_space_table(&self, table_name: &str) -> Result<(), MutationExecutorError> {
174        let sql = format!(
175            "CREATE TABLE IF NOT EXISTS {} (type_name VARCHAR(100) PRIMARY KEY, current_level BIGINT NOT NULL)",
176            quote_ident(table_name)
177        );
178        self.lock()?.execute(&sql, [])?;
179        Ok(())
180    }
181
182    pub fn begin_transaction(&self) -> Result<(), MutationExecutorError> {
183        self.lock()?.execute("BEGIN IMMEDIATE", [])?;
184        Ok(())
185    }
186
187    pub fn commit_transaction(&self) -> Result<(), MutationExecutorError> {
188        self.lock()?.execute("COMMIT", [])?;
189        Ok(())
190    }
191
192    pub fn rollback_transaction(&self) -> Result<(), MutationExecutorError> {
193        self.lock()?.execute("ROLLBACK", [])?;
194        Ok(())
195    }
196
197    pub fn execute(&self, query: &CompiledQuery) -> Result<u64, MutationExecutorError> {
198        let params = bind_values(&query.params)?;
199        let rows = self
200            .lock()?
201            .execute(&query.sql_with_comment(), params_from_iter(params.iter()))?;
202        Ok(rows as u64)
203    }
204
205    pub fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, MutationExecutorError> {
206        let params = bind_values(&query.params)?;
207        let connection = self.lock()?;
208        let mut statement = connection.prepare(&query.sql_with_comment())?;
209        let columns = statement_columns(&statement);
210        let mut rows = statement.query(params_from_iter(params.iter()))?;
211        let mut records = Vec::new();
212        while let Some(row) = rows.next()? {
213            records.push(decode_sqlite_row(row, &columns)?);
214        }
215        Ok(records)
216    }
217
218    pub fn table_exists(&self, table_name: &str) -> Result<bool, MutationExecutorError> {
219        let exists: i64 = self.lock()?.query_row(
220            "SELECT COUNT(1) FROM sqlite_master WHERE type = 'table' AND name = ?",
221            [table_name],
222            |row| row.get(0),
223        )?;
224        Ok(exists > 0)
225    }
226
227    pub fn table_columns(&self, table_name: &str) -> Result<BTreeSet<String>, MutationExecutorError> {
228        let pragma_sql = format!("PRAGMA table_info({})", quote_ident(table_name));
229        let connection = self.lock()?;
230        let mut statement = connection.prepare(&pragma_sql)?;
231        let rows = statement.query_map([], |row| row.get::<_, String>("name"))?;
232        let mut columns = BTreeSet::new();
233        for row in rows {
234            columns.insert(row?.to_lowercase());
235        }
236        Ok(columns)
237    }
238
239    fn lock(&self) -> Result<MutexGuard<'_, Connection>, MutationExecutorError> {
240        self.connection
241            .lock()
242            .map_err(|err| MutationExecutorError::Lock(err.to_string()))
243    }
244}
245
246
247impl SqlTransport for SqliteMutationExecutor {
248    type Error = MutationExecutorError;
249
250    async fn fetch_all_sql(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
251        SqliteMutationExecutor::fetch_all(self, query)
252    }
253
254    async fn execute_sql(&self, query: &CompiledQuery) -> Result<u64, Self::Error> {
255        SqliteMutationExecutor::execute(self, query)
256    }
257}
258
259impl teaql_sql::SqlTransaction for SqliteMutationExecutor {
260    type Error = MutationExecutorError;
261
262    async fn commit_sql(self) -> Result<(), Self::Error> {
263        self.commit_transaction()
264    }
265
266    async fn rollback_sql(self) -> Result<(), Self::Error> {
267        self.rollback_transaction()
268    }
269}
270
271impl teaql_sql::SqlTransactionTransport for SqliteMutationExecutor {
272    type Tx<'a> = Self where Self: 'a;
273
274    async fn begin_sql(&self) -> Result<Self::Tx<'_>, Self::Error> {
275        self.begin_transaction()?;
276        Ok(self.clone())
277    }
278}
279
280
281
282fn initial_graph_exists_sqlite(
283    executor: &SqliteMutationExecutor,
284    dialect: &SqliteDialect,
285    entity: &EntityDescriptor,
286    graph: &GraphNode,
287) -> Result<bool, MutationExecutorError> {
288    let Some(id) = graph.values.get("id") else {
289        return Ok(false);
290    };
291    let query = dialect.compile_select(
292        entity,
293        &SelectQuery::new(&graph.entity)
294            .project("id")
295            .filter(Expr::eq("id", id.clone()))
296            .limit(1),
297    )?;
298    Ok(!executor.fetch_all(&query)?.is_empty())
299}
300
301fn compile_initial_graph_insert(
302    dialect: &impl SqlDialect,
303    entity: &EntityDescriptor,
304    graph: &GraphNode,
305) -> Result<CompiledQuery, MutationExecutorError> {
306    let mut command = InsertCommand::new(&graph.entity);
307    for (field, value) in &graph.values {
308        command = command.value(field.clone(), value.clone());
309    }
310    dialect.compile_insert(entity, &command).map_err(Into::into)
311}
312
313fn compile_initial_graph_update(
314    dialect: &impl SqlDialect,
315    entity: &EntityDescriptor,
316    graph: &GraphNode,
317) -> Result<Option<CompiledQuery>, MutationExecutorError> {
318    let Some(id) = graph.values.get("id") else {
319        return Ok(None);
320    };
321    let mut command = UpdateCommand::new(&graph.entity, id.clone());
322    for (field, value) in &graph.values {
323        if field == "id" {
324            continue;
325        }
326        command = command.value(field.clone(), value.clone());
327    }
328    match dialect.compile_update(entity, &command) {
329        Ok(query) => Ok(Some(query)),
330        Err(SqlCompileError::EmptyMutation(_)) => Ok(None),
331        Err(err) => Err(err.into()),
332    }
333}
334
335pub trait SqliteSchemaExt {
336    fn ensure_sqlite_schema(
337        &self,
338    ) -> Pin<Box<dyn Future<Output = Result<(), MutationExecutorError>> + Send + '_>>;
339}
340
341pub fn ensure_sqlite_schema_for(ctx: &UserContext) -> Result<(), MutationExecutorError> {
342    let dialect = ctx.get_resource::<SqliteDialect>().ok_or_else(|| {
343        MutationExecutorError::Bind("missing typed resource: SqliteDialect".to_owned())
344    })?;
345    let executor = ctx
346        .get_resource::<SqliteMutationExecutor>()
347        .ok_or_else(|| {
348            MutationExecutorError::Bind(
349                "missing typed resource: SqliteMutationExecutor".to_owned(),
350            )
351        })?;
352
353    let entities = ctx.all_entities();
354
355    // Ensure id space table exists
356    executor.ensure_id_space_table(DEFAULT_ID_SPACE_TABLE)?;
357
358    // Process each entity table individually with granular events
359    for entity in &entities {
360        let field_count = entity.properties.len();
361        if !executor.table_exists(&entity.table_name)? {
362            // New table: create it
363            let sql = dialect.compile_create_table(entity)?;
364            executor.lock()?.execute(&sql, [])?;
365            let _ = ctx.send_event(RawAuditEvent::schema_created(
366                &entity.name,
367                &entity.table_name,
368                field_count,
369            ));
370        } else {
371            // Existing table: check for missing columns
372            let existing_columns = executor.table_columns(&entity.table_name)?;
373            let mut fields_added = 0;
374            for property in &entity.properties {
375                let bare_column = strip_identifier_quotes(&property.column_name).to_lowercase();
376                if existing_columns.contains(&bare_column) {
377                    continue;
378                }
379                let sql = dialect.compile_add_column(entity, property)?;
380                executor.lock()?.execute(&sql, [])?;
381                let _ = ctx.send_event(RawAuditEvent::field_added(
382                    &entity.name,
383                    &entity.table_name,
384                    &property.column_name,
385                ));
386                fields_added += 1;
387            }
388            let _ = ctx.send_event(RawAuditEvent::schema_verified(
389                &entity.name,
390                &entity.table_name,
391                field_count,
392            ));
393            let _ = fields_added; // used above for FieldAdded events
394        }
395    }
396
397    // Seed initial data, tracking insert vs update counts per entity
398    let mut seed_counts: BTreeMap<String, (usize, usize)> = BTreeMap::new(); // (inserted, updated)
399    for graph in ctx.initial_graphs() {
400        let entity = ctx.entity(&graph.entity).ok_or_else(|| {
401            MutationExecutorError::Bind(format!("missing entity: {}", graph.entity))
402        })?;
403        let counts = seed_counts.entry(graph.entity.clone()).or_insert((0, 0));
404        if initial_graph_exists_sqlite(executor, dialect, entity, graph)? {
405            if let Some(query) = compile_initial_graph_update(dialect, entity, graph)? {
406                executor.execute(&query)?;
407            }
408            counts.1 += 1; // updated
409        } else {
410            let query = compile_initial_graph_insert(dialect, entity, graph)?;
411            executor.execute(&query)?;
412            counts.0 += 1; // inserted
413        }
414    }
415
416    // Fire DataSeeded events per entity type
417    for (entity_name, (inserted, updated)) in &seed_counts {
418        let entity = ctx.entity(entity_name).ok_or_else(|| {
419            MutationExecutorError::Bind(format!("missing entity: {}", entity_name))
420        })?;
421        let _ = ctx.send_event(RawAuditEvent::data_seeded(
422            entity_name,
423            &entity.table_name,
424            *inserted,
425            *updated,
426        ));
427    }
428
429    Ok(())
430}
431
432impl SqliteSchemaExt for UserContext {
433    fn ensure_sqlite_schema(
434        &self,
435    ) -> Pin<Box<dyn Future<Output = Result<(), MutationExecutorError>> + Send + '_>> {
436        Box::pin(async move { ensure_sqlite_schema_for(self) })
437    }
438}
439
440#[derive(Debug, Default, Clone, Copy)]
441pub struct SqliteSchemaProvider;
442
443impl SchemaProvider for SqliteSchemaProvider {
444    fn ensure_schema<'a>(
445        &'a self,
446        ctx: &'a UserContext,
447    ) -> Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>> {
448        Box::pin(async move {
449            ensure_sqlite_schema_for(ctx).map_err(|err| RuntimeError::Schema(err.to_string()))
450        })
451    }
452}
453
454pub trait SqliteProviderExt {
455    fn use_sqlite_provider(&mut self, executor: SqliteMutationExecutor) -> &mut Self;
456}
457
458impl SqliteProviderExt for UserContext {
459    fn use_sqlite_provider(&mut self, executor: SqliteMutationExecutor) -> &mut Self {
460        self.insert_resource(SqliteDialect);
461        self.insert_resource(executor);
462        self.set_schema_provider(SqliteSchemaProvider);
463        self
464    }
465}
466
467#[derive(Clone)]
468pub struct SqliteIdSpaceGenerator {
469    executor: SqliteMutationExecutor,
470    table_name: String,
471}
472
473impl SqliteIdSpaceGenerator {
474    pub fn new(connection: Connection) -> Self {
475        Self::from_executor(SqliteMutationExecutor::from_connection(connection))
476    }
477
478    pub fn from_executor(executor: SqliteMutationExecutor) -> Self {
479        Self {
480            executor,
481            table_name: DEFAULT_ID_SPACE_TABLE.to_owned(),
482        }
483    }
484
485    pub fn with_table_name(mut self, table_name: impl Into<String>) -> Self {
486        self.table_name = table_name.into();
487        self
488    }
489
490    pub fn ensure_table(&self) -> Result<(), MutationExecutorError> {
491        self.executor.ensure_id_space_table(&self.table_name)
492    }
493
494    pub fn next_id(&self, entity: &str) -> Result<u64, MutationExecutorError> {
495        self.ensure_table()?;
496        let sql = format!(
497            "INSERT INTO {} (type_name, current_level) VALUES (?, 1) \
498             ON CONFLICT (type_name) DO UPDATE \
499             SET current_level = current_level + 1 \
500             RETURNING current_level",
501            quote_ident(&self.table_name)
502        );
503        let id: i64 = self
504            .executor
505            .lock()?
506            .query_row(&sql, [entity], |row| row.get(0))?;
507        u64::try_from(id).map_err(|_| {
508            MutationExecutorError::Bind(format!("generated id {id} cannot be represented as u64"))
509        })
510    }
511}
512
513impl InternalIdGenerator for SqliteIdSpaceGenerator {
514    fn generate_id(&self, entity: &str) -> Result<u64, RuntimeError> {
515        self.next_id(entity)
516            .map_err(|err| RuntimeError::IdGeneration(err.to_string()))
517    }
518}
519
520fn quote_ident(ident: &str) -> String {
521    quote_identifier_if_needed(ident, '"')
522}
523
524/// Strip wrapping identifier quotes from a SQL identifier.
525///
526/// SQLite `PRAGMA table_info` returns bare column names (e.g. `description`),
527/// but generated `PropertyDescriptor::column_name` may carry quotes
528/// (e.g. `"description"`) when the name is a reserved keyword.  This helper
529/// normalises the column name so the two can be compared correctly during
530/// schema migration.
531fn strip_identifier_quotes(ident: &str) -> &str {
532    let bytes = ident.as_bytes();
533    if bytes.len() >= 2 {
534        let (first, last) = (bytes[0], bytes[bytes.len() - 1]);
535        if (first == b'"' && last == b'"')
536            || (first == b'`' && last == b'`')
537            || (first == b'[' && last == b']')
538        {
539            return &ident[1..ident.len() - 1];
540        }
541    }
542    ident
543}
544
545fn bind_values(values: &[Value]) -> Result<Vec<SqliteValue>, MutationExecutorError> {
546    values.iter().map(bind_sqlite_value).collect()
547}
548
549fn bind_sqlite_value(value: &Value) -> Result<SqliteValue, MutationExecutorError> {
550    match value {
551        Value::Null => Ok(SqliteValue::Null),
552        Value::Bool(v) => Ok(SqliteValue::Integer(i64::from(*v))),
553        Value::I64(v) => Ok(SqliteValue::Integer(*v)),
554        Value::U64(v) => i64::try_from(*v)
555            .map(SqliteValue::Integer)
556            .map_err(|_| MutationExecutorError::Bind(format!("u64 value {v} exceeds i64 range"))),
557        Value::F64(v) => Ok(SqliteValue::Real(*v)),
558        Value::Decimal(v) => Ok(SqliteValue::Text(format!("{SQLITE_DECIMAL_PREFIX}{v}"))),
559        Value::Text(v) => Ok(SqliteValue::Text(v.clone())),
560        Value::Json(v) => Ok(SqliteValue::Text(v.to_string())),
561        Value::Date(v) => Ok(SqliteValue::Text(v.format("%Y-%m-%d").to_string())),
562        Value::Timestamp(v) => Ok(SqliteValue::Text(v.to_rfc3339())),
563        Value::Object(_) => Err(MutationExecutorError::UnsupportedValue("object")),
564        Value::List(_) => Err(MutationExecutorError::UnsupportedValue("list")),
565    }
566}
567
568#[derive(Debug, Clone)]
569struct ColumnInfo {
570    name: String,
571    decl_type: Option<String>,
572}
573
574fn statement_columns(statement: &rusqlite::Statement<'_>) -> Vec<ColumnInfo> {
575    statement
576        .columns()
577        .into_iter()
578        .map(|column| ColumnInfo {
579            name: column.name().to_owned(),
580            decl_type: column.decl_type().map(|value| value.to_ascii_uppercase()),
581        })
582        .collect()
583}
584
585fn decode_sqlite_row(
586    row: &Row<'_>,
587    columns: &[ColumnInfo],
588) -> Result<Record, MutationExecutorError> {
589    let mut record = BTreeMap::new();
590    for (index, column) in columns.iter().enumerate() {
591        let value_ref = row.get_ref(index)?;
592        let value = match value_ref {
593            ValueRef::Null => Value::Null,
594            ValueRef::Integer(value) => decode_sqlite_integer(value, column),
595            ValueRef::Real(value) => Value::F64(value),
596            ValueRef::Text(value) => decode_sqlite_text(value, column)?,
597            ValueRef::Blob(_) => {
598                return Err(MutationExecutorError::UnsupportedColumnType(
599                    "BLOB".to_owned(),
600                ));
601            }
602        };
603        record.insert(column.name.clone(), value);
604    }
605    Ok(record)
606}
607
608fn decode_sqlite_integer(value: i64, column: &ColumnInfo) -> Value {
609    match column_decl_type(column).as_deref() {
610        Some("BOOLEAN") | Some("BOOL") => Value::Bool(value != 0),
611        _ => Value::I64(value),
612    }
613}
614
615fn decode_sqlite_text(value: &[u8], column: &ColumnInfo) -> Result<Value, MutationExecutorError> {
616    let value = std::str::from_utf8(value)
617        .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite text: {err}")))?;
618    if let Some(decimal) = value.strip_prefix(SQLITE_DECIMAL_PREFIX) {
619        return Decimal::from_str(decimal)
620            .map(Value::Decimal)
621            .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite decimal: {err}")));
622    }
623
624    match column_decl_type(column).as_deref() {
625        Some("NUMERIC") | Some("DECIMAL") => Decimal::from_str(value)
626            .map(Value::Decimal)
627            .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite decimal: {err}"))),
628        Some("JSON") => serde_json::from_str(value).map(Value::Json).map_err(|err| {
629            MutationExecutorError::Bind(format!("invalid sqlite json value: {err}"))
630        }),
631        Some("DATE") => NaiveDate::parse_from_str(value, "%Y-%m-%d")
632            .map(Value::Date)
633            .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite date: {err}"))),
634        Some("TIMESTAMP") | Some("DATETIME") => parse_sqlite_timestamp(value),
635        _ => infer_sqlite_text(value),
636    }
637}
638
639fn infer_sqlite_text(value: &str) -> Result<Value, MutationExecutorError> {
640    if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
641        return Ok(Value::Date(date));
642    }
643    if let Ok(timestamp) = DateTime::parse_from_rfc3339(value) {
644        return Ok(Value::Timestamp(timestamp.with_timezone(&Utc)));
645    }
646    if let Ok(timestamp) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
647        return Ok(Value::Timestamp(Utc.from_utc_datetime(&timestamp)));
648    }
649    Ok(Value::Text(value.to_owned()))
650}
651
652fn parse_sqlite_timestamp(value: &str) -> Result<Value, MutationExecutorError> {
653    if let Ok(timestamp) = DateTime::parse_from_rfc3339(value) {
654        return Ok(Value::Timestamp(timestamp.with_timezone(&Utc)));
655    }
656    NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S")
657        .map(|timestamp| Value::Timestamp(Utc.from_utc_datetime(&timestamp)))
658        .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite timestamp: {err}")))
659}
660
661fn column_decl_type(column: &ColumnInfo) -> Option<String> {
662    column
663        .decl_type
664        .as_ref()
665        .map(|value| value.split('(').next().unwrap_or(value).trim().to_owned())
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671    use teaql_core::{DeleteCommand, RecoverCommand};
672    use teaql_runtime::InMemoryMetadataStore;
673
674    fn entity() -> EntityDescriptor {
675        EntityDescriptor::new("Order")
676            .table_name("orders")
677            .property(
678                PropertyDescriptor::new("id", DataType::U64)
679                    .column_name("id")
680                    .id()
681                    .not_null(),
682            )
683            .property(
684                PropertyDescriptor::new("version", DataType::I64)
685                    .column_name("version")
686                    .version()
687                    .not_null(),
688            )
689            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
690    }
691
692    #[test]
693    fn sqlite_dialect_compiles_mutations_and_schema() {
694        let insert = SqliteDialect
695            .compile_insert(
696                &entity(),
697                &InsertCommand::new("Order")
698                    .value("id", 1_u64)
699                    .value("name", "A"),
700            )
701            .unwrap();
702        assert_eq!(insert.sql, "INSERT INTO orders (id, name) VALUES (?, ?)");
703
704        let update = SqliteDialect
705            .compile_update(
706                &entity(),
707                &UpdateCommand::new("Order", 1_u64)
708                    .expected_version(3)
709                    .value("name", "B"),
710            )
711            .unwrap();
712        assert_eq!(
713            update.sql,
714            "UPDATE orders SET name = ?, version = ? WHERE id = ? AND version = ?"
715        );
716
717        let delete = SqliteDialect
718            .compile_delete(
719                &entity(),
720                &DeleteCommand::new("Order", 1_u64).expected_version(3),
721            )
722            .unwrap();
723        let recover = SqliteDialect
724            .compile_recover(&entity(), &RecoverCommand::new("Order", 1_u64, -4))
725            .unwrap();
726        assert_eq!(
727            delete.sql,
728            "UPDATE orders SET version = ? WHERE id = ? AND version = ?"
729        );
730        assert_eq!(
731            recover.sql,
732            "UPDATE orders SET version = ? WHERE id = ? AND version = ?"
733        );
734
735        let create = SqliteDialect.compile_create_table(&entity()).unwrap();
736        assert_eq!(
737            create,
738            "CREATE TABLE IF NOT EXISTS orders (id INTEGER PRIMARY KEY NOT NULL, version INTEGER NOT NULL, name TEXT)"
739        );
740    }
741
742    #[test]
743    fn sqlite_executor_ensures_schema_and_roundtrips_rows() {
744        let executor = SqliteMutationExecutor::from_connection(Connection::open_in_memory().unwrap());
745        let entity = entity();
746        let mut ctx = UserContext::new()
747            .with_metadata(InMemoryMetadataStore::new().with_entity(entity.clone()));
748
749        ctx.use_sqlite_provider(executor.clone());
750        ensure_sqlite_schema_for(&ctx).unwrap();
751
752        let insert = SqliteDialect
753            .compile_insert(
754                &entity,
755                &InsertCommand::new("Order")
756                    .value("id", 1_u64)
757                    .value("version", 1_i64)
758                    .value("name", "draft"),
759            )
760            .unwrap();
761        assert_eq!(executor.execute(&insert).unwrap(), 1);
762
763        let select = SqliteDialect
764            .compile_select(
765                &entity,
766                &SelectQuery::new("Order")
767                    .filter(Expr::eq("id", 1_u64))
768                    .order_asc("id"),
769            )
770            .unwrap();
771        let rows = executor.fetch_all(&select).unwrap();
772        assert_eq!(rows.len(), 1);
773        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
774        assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
775        assert_eq!(rows[0].get("name"), Some(&Value::Text("draft".to_owned())));
776    }
777
778    #[test]
779    fn sqlite_executor_parses_json_only_for_json_columns() {
780        let executor = SqliteMutationExecutor::from_connection(Connection::open_in_memory().unwrap());
781
782        executor
783            .execute(&CompiledQuery {
784                sql: "CREATE TABLE payloads (text_payload TEXT, json_payload JSON)".to_owned(),
785                params: Vec::new(),
786                comment: None,
787            })
788            .unwrap();
789        executor
790            .execute(&CompiledQuery {
791                sql: "INSERT INTO payloads (text_payload, json_payload) VALUES (?, ?)".to_owned(),
792                params: vec![
793                    Value::Text("{\"active\":true}".to_owned()),
794                    Value::Json(serde_json::json!({"active": true})),
795                ],
796                comment: None,
797            })
798            .unwrap();
799
800        let rows = executor
801            .fetch_all(&CompiledQuery {
802                sql: "SELECT text_payload, json_payload FROM payloads".to_owned(),
803                params: Vec::new(),
804                comment: None,
805            })
806            .unwrap();
807
808        assert_eq!(
809            rows[0].get("text_payload"),
810            Some(&Value::Text("{\"active\":true}".to_owned()))
811        );
812        assert_eq!(
813            rows[0].get("json_payload"),
814            Some(&Value::Json(serde_json::json!({"active": true})))
815        );
816    }
817
818    #[test]
819    fn sqlite_id_space_generator_increments_ids() {
820        let executor = SqliteMutationExecutor::from_connection(Connection::open_in_memory().unwrap());
821        let generator = SqliteIdSpaceGenerator::from_executor(executor);
822        assert_eq!(generator.next_id("Order").unwrap(), 1);
823        assert_eq!(generator.next_id("Order").unwrap(), 2);
824    }
825}