Skip to main content

teaql_provider_sqlite/
lib.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::future::Future;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
8use rusqlite::types::{Value as SqliteValue, ValueRef};
9use rusqlite::{Connection, Row, params_from_iter};
10use rust_decimal::Decimal;
11use teaql_core::{
12    DataType, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record, SelectQuery,
13    UpdateCommand, Value,
14};
15use teaql_runtime::{
16    RawAuditEvent, GraphNode, InternalIdGenerator,
17    RuntimeError, SchemaProvider, UserContext,
18};
19use teaql_sql::{
20    CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect,
21    SqlTransport, quote_identifier_if_needed,
22};
23
24pub const DEFAULT_ID_SPACE_TABLE: &str = "teaql_id_space";
25
26const SQLITE_DECIMAL_PREFIX: &str = "__teaql_decimal__:";
27
28#[derive(Debug, Default, Clone, Copy)]
29pub struct SqliteDialect;
30
31impl SqlDialect for SqliteDialect {
32    fn kind(&self) -> DatabaseKind {
33        DatabaseKind::Sqlite
34    }
35
36    fn quote_ident(&self, ident: &str) -> String {
37        quote_ident(ident)
38    }
39
40    fn placeholder(&self, _index: usize) -> String {
41        "?".to_owned()
42    }
43
44    fn schema_type_sql(
45        &self,
46        data_type: DataType,
47        property: &PropertyDescriptor,
48    ) -> Result<&'static str, SqlCompileError> {
49        match data_type {
50            DataType::Bool => Ok("INTEGER"),
51            DataType::I64 | DataType::U64 if property.is_id => Ok("INTEGER"),
52            DataType::I64 | DataType::U64 => Ok("INTEGER"),
53            DataType::F64 => Ok("REAL"),
54            DataType::Decimal => Ok("NUMERIC"),
55            DataType::Text => Ok("TEXT"),
56            DataType::Json => Ok("JSON"),
57            DataType::Date => Ok("DATE"),
58            DataType::Timestamp => Ok("TIMESTAMP"),
59        }
60    }
61
62    fn compile_add_column(
63        &self,
64        entity: &EntityDescriptor,
65        property: &PropertyDescriptor,
66    ) -> Result<String, SqlCompileError> {
67        // SQLite does not support adding NOT NULL columns without a DEFAULT.
68        // Since TeaQL enforces nullability at the application layer, we can safely
69        // strip the NOT NULL constraint when adding columns to existing tables.
70        let def = self.column_definition_sql(property)?;
71        let def_without_not_null = def.replace(" NOT NULL", "");
72        
73        Ok(format!(
74            "ALTER TABLE {} ADD COLUMN {}",
75            self.quote_ident(&entity.table_name),
76            def_without_not_null
77        ))
78    }
79}
80
81#[derive(Debug)]
82pub enum MutationExecutorError {
83    Sqlite(rusqlite::Error),
84    SqlCompile(SqlCompileError),
85    UnsupportedValue(&'static str),
86    UnsupportedColumnType(String),
87    Bind(String),
88    Lock(String),
89}
90
91impl std::fmt::Display for MutationExecutorError {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        match self {
94            Self::Sqlite(err) => err.fmt(f),
95            Self::SqlCompile(err) => err.fmt(f),
96            Self::UnsupportedValue(kind) => {
97                write!(
98                    f,
99                    "unsupported rusqlite bind value for mutation executor: {kind}"
100                )
101            }
102            Self::UnsupportedColumnType(kind) => {
103                write!(
104                    f,
105                    "unsupported rusqlite column type for record decoding: {kind}"
106                )
107            }
108            Self::Bind(message) => write!(f, "rusqlite bind error: {message}"),
109            Self::Lock(message) => write!(f, "rusqlite connection lock error: {message}"),
110        }
111    }
112}
113
114impl std::error::Error for MutationExecutorError {}
115
116impl From<rusqlite::Error> for MutationExecutorError {
117    fn from(value: rusqlite::Error) -> Self {
118        Self::Sqlite(value)
119    }
120}
121
122impl From<SqlCompileError> for MutationExecutorError {
123    fn from(value: SqlCompileError) -> Self {
124        Self::SqlCompile(value)
125    }
126}
127
128#[derive(Clone)]
129pub struct SqliteMutationExecutor {
130    connection: Arc<Mutex<Connection>>,
131}
132
133impl SqliteMutationExecutor {
134    pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
135        Self { connection }
136    }
137
138    pub fn from_connection(connection: Connection) -> Self {
139        Self::new(Arc::new(Mutex::new(connection)))
140    }
141
142    pub fn connection(&self) -> Arc<Mutex<Connection>> {
143        Arc::clone(&self.connection)
144    }
145
146    pub fn ensure_schema(
147        &self,
148        dialect: &SqliteDialect,
149        entities: &[&EntityDescriptor],
150    ) -> Result<(), MutationExecutorError> {
151        self.ensure_id_space_table(DEFAULT_ID_SPACE_TABLE)?;
152
153        for entity in entities {
154            if !self.table_exists(&entity.table_name)? {
155                let sql = dialect.compile_create_table(entity)?;
156                self.lock()?.execute(&sql, [])?;
157                continue;
158            }
159
160            let existing_columns = self.table_columns(&entity.table_name)?;
161            for property in &entity.properties {
162                let bare_column = strip_identifier_quotes(&property.column_name).to_lowercase();
163                if existing_columns.contains(&bare_column) {
164                    continue;
165                }
166                let sql = dialect.compile_add_column(entity, property)?;
167                self.lock()?.execute(&sql, [])?;
168            }
169        }
170        Ok(())
171    }
172
173    pub fn ensure_id_space_table(&self, table_name: &str) -> Result<(), MutationExecutorError> {
174        let sql = format!(
175            "CREATE TABLE IF NOT EXISTS {} (type_name VARCHAR(100) PRIMARY KEY, current_level BIGINT NOT NULL)",
176            quote_ident(table_name)
177        );
178        self.lock()?.execute(&sql, [])?;
179        Ok(())
180    }
181
182    pub fn begin_transaction(&self) -> Result<(), MutationExecutorError> {
183        self.lock()?.execute("BEGIN IMMEDIATE", [])?;
184        Ok(())
185    }
186
187    pub fn commit_transaction(&self) -> Result<(), MutationExecutorError> {
188        self.lock()?.execute("COMMIT", [])?;
189        Ok(())
190    }
191
192    pub fn rollback_transaction(&self) -> Result<(), MutationExecutorError> {
193        self.lock()?.execute("ROLLBACK", [])?;
194        Ok(())
195    }
196
197    pub fn execute(&self, query: &CompiledQuery) -> Result<u64, MutationExecutorError> {
198        let params = bind_values(&query.params)?;
199        let rows = self
200            .lock()?
201            .execute(&query.sql_with_comment(), params_from_iter(params.iter()))?;
202        Ok(rows as u64)
203    }
204
205    pub fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, MutationExecutorError> {
206        let params = bind_values(&query.params)?;
207        let connection = self.lock()?;
208        let mut statement = connection.prepare(&query.sql_with_comment())?;
209        let columns = statement_columns(&statement);
210        let mut rows = statement.query(params_from_iter(params.iter()))?;
211        let mut records = Vec::new();
212        while let Some(row) = rows.next()? {
213            records.push(decode_sqlite_row(row, &columns)?);
214        }
215        Ok(records)
216    }
217
218    /// Fetch rows in streaming mode (chunked).
219    /// Returns a Vec of StreamChunk, each containing up to `chunk_size` rows.
220    pub fn fetch_stream(
221        &self,
222        query: &CompiledQuery,
223        chunk_size: usize,
224    ) -> Result<Vec<teaql_data_service::StreamChunk>, MutationExecutorError> {
225        let params = bind_values(&query.params)?;
226        let connection = self.lock()?;
227        let mut statement = connection.prepare(&query.sql_with_comment())?;
228        let columns = statement_columns(&statement);
229        let mut rows = statement.query(params_from_iter(params.iter()))?;
230
231        let mut chunks = Vec::new();
232        let mut current_chunk = Vec::new();
233        let mut chunk_index = 0;
234
235        while let Some(row) = rows.next()? {
236            current_chunk.push(decode_sqlite_row(row, &columns)?);
237            if current_chunk.len() >= chunk_size {
238                chunks.push(teaql_data_service::StreamChunk {
239                    rows: current_chunk,
240                    chunk_index,
241                    is_last: false,
242                });
243                current_chunk = Vec::new();
244                chunk_index += 1;
245            }
246        }
247
248        // Push the final chunk (may be empty if exactly aligned)
249        chunks.push(teaql_data_service::StreamChunk {
250            rows: current_chunk,
251            chunk_index,
252            is_last: true,
253        });
254
255        Ok(chunks)
256    }
257
258    pub fn table_exists(&self, table_name: &str) -> Result<bool, MutationExecutorError> {
259        let exists: i64 = self.lock()?.query_row(
260            "SELECT COUNT(1) FROM sqlite_master WHERE type = 'table' AND name = ?",
261            [table_name],
262            |row| row.get(0),
263        )?;
264        Ok(exists > 0)
265    }
266
267    pub fn table_columns(&self, table_name: &str) -> Result<BTreeSet<String>, MutationExecutorError> {
268        let pragma_sql = format!("PRAGMA table_info({})", quote_ident(table_name));
269        let connection = self.lock()?;
270        let mut statement = connection.prepare(&pragma_sql)?;
271        let rows = statement.query_map([], |row| row.get::<_, String>("name"))?;
272        let mut columns = BTreeSet::new();
273        for row in rows {
274            columns.insert(row?.to_lowercase());
275        }
276        Ok(columns)
277    }
278
279    fn lock(&self) -> Result<MutexGuard<'_, Connection>, MutationExecutorError> {
280        self.connection
281            .lock()
282            .map_err(|err| MutationExecutorError::Lock(err.to_string()))
283    }
284}
285
286
287impl teaql_data_service::DataServiceExecutor for SqliteMutationExecutor {
288    type Error = MutationExecutorError;
289
290    fn capabilities(&self) -> teaql_data_service::DataServiceCapabilities {
291        teaql_data_service::DataServiceCapabilities {
292            query: true,
293            mutation: true,
294            transaction: true,
295            schema: true,
296            id_generation: true,
297            ..Default::default()
298        }
299    }
300}
301
302impl SqlTransport for SqliteMutationExecutor {
303    type Error = MutationExecutorError;
304
305    async fn fetch_all_sql(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
306        SqliteMutationExecutor::fetch_all(self, query)
307    }
308
309    async fn execute_sql(&self, query: &CompiledQuery) -> Result<u64, Self::Error> {
310        SqliteMutationExecutor::execute(self, query)
311    }
312}
313
314impl teaql_data_service::StreamQueryExecutor for SqliteMutationExecutor {
315    async fn query_stream(
316        &self,
317        request: teaql_data_service::QueryRequest,
318        chunk_size: usize,
319    ) -> Result<Vec<teaql_data_service::StreamChunk>, Self::Error> {
320        let dialect = SqliteDialect;
321        // Use a dummy entity descriptor for compilation
322        let entity_desc = teaql_core::EntityDescriptor::new(&request.query.entity);
323        let compiled = dialect.compile_select(&entity_desc, &request.query)
324            .map_err(MutationExecutorError::SqlCompile)?;
325        SqliteMutationExecutor::fetch_stream(self, &compiled, chunk_size)
326    }
327}
328
329impl teaql_sql::SqlTransaction for SqliteMutationExecutor {
330    type Error = MutationExecutorError;
331
332    async fn commit_sql(self) -> Result<(), Self::Error> {
333        self.commit_transaction()
334    }
335
336    async fn rollback_sql(self) -> Result<(), Self::Error> {
337        self.rollback_transaction()
338    }
339}
340
341impl teaql_sql::SqlTransactionTransport for SqliteMutationExecutor {
342    type Tx<'a> = Self where Self: 'a;
343
344    async fn begin_sql(&self) -> Result<Self::Tx<'_>, Self::Error> {
345        self.begin_transaction()?;
346        Ok(self.clone())
347    }
348}
349
350
351
352fn initial_graph_exists_sqlite(
353    executor: &SqliteMutationExecutor,
354    dialect: &SqliteDialect,
355    entity: &EntityDescriptor,
356    graph: &GraphNode,
357) -> Result<bool, MutationExecutorError> {
358    let Some(id) = graph.values.get("id") else {
359        return Ok(false);
360    };
361    let query = dialect.compile_select(
362        entity,
363        &SelectQuery::new(&graph.entity)
364            .project("id")
365            .filter(Expr::eq("id", id.clone()))
366            .limit(1),
367    )?;
368    Ok(!executor.fetch_all(&query)?.is_empty())
369}
370
371fn compile_initial_graph_insert(
372    dialect: &impl SqlDialect,
373    entity: &EntityDescriptor,
374    graph: &GraphNode,
375) -> Result<CompiledQuery, MutationExecutorError> {
376    let mut command = InsertCommand::new(&graph.entity);
377    for (field, value) in &graph.values {
378        command = command.value(field.clone(), value.clone());
379    }
380    dialect.compile_insert(entity, &command).map_err(Into::into)
381}
382
383fn compile_initial_graph_update(
384    dialect: &impl SqlDialect,
385    entity: &EntityDescriptor,
386    graph: &GraphNode,
387) -> Result<Option<CompiledQuery>, MutationExecutorError> {
388    let Some(id) = graph.values.get("id") else {
389        return Ok(None);
390    };
391    let mut command = UpdateCommand::new(&graph.entity, id.clone());
392    for (field, value) in &graph.values {
393        if field == "id" {
394            continue;
395        }
396        command = command.value(field.clone(), value.clone());
397    }
398    match dialect.compile_update(entity, &command) {
399        Ok(query) => Ok(Some(query)),
400        Err(SqlCompileError::EmptyMutation(_)) => Ok(None),
401        Err(err) => Err(err.into()),
402    }
403}
404
405pub trait SqliteSchemaExt {
406    fn ensure_sqlite_schema(
407        &self,
408    ) -> Pin<Box<dyn Future<Output = Result<(), MutationExecutorError>> + Send + '_>>;
409}
410
411pub fn ensure_sqlite_schema_for(ctx: &UserContext) -> Result<(), MutationExecutorError> {
412    let dialect = ctx.get_resource::<SqliteDialect>().ok_or_else(|| {
413        MutationExecutorError::Bind("missing typed resource: SqliteDialect".to_owned())
414    })?;
415    let executor = ctx
416        .get_resource::<SqliteMutationExecutor>()
417        .ok_or_else(|| {
418            MutationExecutorError::Bind(
419                "missing typed resource: SqliteMutationExecutor".to_owned(),
420            )
421        })?;
422
423    let entities = ctx.all_entities();
424
425    // Ensure id space table exists
426    executor.ensure_id_space_table(DEFAULT_ID_SPACE_TABLE)?;
427
428    // Process each entity table individually with granular events
429    for entity in &entities {
430        let field_count = entity.properties.len();
431        if !executor.table_exists(&entity.table_name)? {
432            // New table: create it
433            let sql = dialect.compile_create_table(entity)?;
434            executor.lock()?.execute(&sql, [])?;
435            let _ = ctx.send_event(RawAuditEvent::schema_created(
436                &entity.name,
437                &entity.table_name,
438                field_count,
439            ));
440        } else {
441            // Existing table: check for missing columns
442            let existing_columns = executor.table_columns(&entity.table_name)?;
443            let mut fields_added = 0;
444            for property in &entity.properties {
445                let bare_column = strip_identifier_quotes(&property.column_name).to_lowercase();
446                if existing_columns.contains(&bare_column) {
447                    continue;
448                }
449                let sql = dialect.compile_add_column(entity, property)?;
450                executor.lock()?.execute(&sql, [])?;
451                let _ = ctx.send_event(RawAuditEvent::field_added(
452                    &entity.name,
453                    &entity.table_name,
454                    &property.column_name,
455                ));
456                fields_added += 1;
457            }
458            let _ = ctx.send_event(RawAuditEvent::schema_verified(
459                &entity.name,
460                &entity.table_name,
461                field_count,
462            ));
463            let _ = fields_added; // used above for FieldAdded events
464        }
465    }
466
467    // Seed initial data, tracking insert vs update counts per entity
468    let mut seed_counts: BTreeMap<String, (usize, usize)> = BTreeMap::new(); // (inserted, updated)
469    for graph in ctx.initial_graphs() {
470        let entity = ctx.entity(&graph.entity).ok_or_else(|| {
471            MutationExecutorError::Bind(format!("missing entity: {}", graph.entity))
472        })?;
473        let counts = seed_counts.entry(graph.entity.clone()).or_insert((0, 0));
474        if initial_graph_exists_sqlite(executor, dialect, entity, graph)? {
475            if let Some(query) = compile_initial_graph_update(dialect, entity, graph)? {
476                executor.execute(&query)?;
477            }
478            counts.1 += 1; // updated
479        } else {
480            let query = compile_initial_graph_insert(dialect, entity, graph)?;
481            executor.execute(&query)?;
482            counts.0 += 1; // inserted
483        }
484    }
485
486    // Fire DataSeeded events per entity type
487    for (entity_name, (inserted, updated)) in &seed_counts {
488        let entity = ctx.entity(entity_name).ok_or_else(|| {
489            MutationExecutorError::Bind(format!("missing entity: {}", entity_name))
490        })?;
491        let _ = ctx.send_event(RawAuditEvent::data_seeded(
492            entity_name,
493            &entity.table_name,
494            *inserted,
495            *updated,
496        ));
497    }
498
499    Ok(())
500}
501
502impl SqliteSchemaExt for UserContext {
503    fn ensure_sqlite_schema(
504        &self,
505    ) -> Pin<Box<dyn Future<Output = Result<(), MutationExecutorError>> + Send + '_>> {
506        Box::pin(async move { ensure_sqlite_schema_for(self) })
507    }
508}
509
510#[derive(Debug, Default, Clone, Copy)]
511pub struct SqliteSchemaProvider;
512
513impl SchemaProvider for SqliteSchemaProvider {
514    fn ensure_schema<'a>(
515        &'a self,
516        ctx: &'a UserContext,
517    ) -> Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>> {
518        Box::pin(async move {
519            ensure_sqlite_schema_for(ctx).map_err(|err| RuntimeError::Schema(err.to_string()))
520        })
521    }
522}
523
524pub trait SqliteProviderExt {
525    fn use_sqlite_provider(&mut self, executor: SqliteMutationExecutor) -> &mut Self;
526}
527
528impl SqliteProviderExt for UserContext {
529    fn use_sqlite_provider(&mut self, executor: SqliteMutationExecutor) -> &mut Self {
530        self.insert_resource(SqliteDialect);
531        self.insert_resource(executor);
532        self.set_schema_provider(SqliteSchemaProvider);
533        self
534    }
535}
536
537#[derive(Clone)]
538pub struct SqliteIdSpaceGenerator {
539    executor: SqliteMutationExecutor,
540    table_name: String,
541}
542
543impl SqliteIdSpaceGenerator {
544    pub fn new(connection: Connection) -> Self {
545        Self::from_executor(SqliteMutationExecutor::from_connection(connection))
546    }
547
548    pub fn from_executor(executor: SqliteMutationExecutor) -> Self {
549        Self {
550            executor,
551            table_name: DEFAULT_ID_SPACE_TABLE.to_owned(),
552        }
553    }
554
555    pub fn with_table_name(mut self, table_name: impl Into<String>) -> Self {
556        self.table_name = table_name.into();
557        self
558    }
559
560    pub fn ensure_table(&self) -> Result<(), MutationExecutorError> {
561        self.executor.ensure_id_space_table(&self.table_name)
562    }
563
564    pub fn next_id(&self, entity: &str) -> Result<u64, MutationExecutorError> {
565        self.ensure_table()?;
566        let sql = format!(
567            "INSERT INTO {} (type_name, current_level) VALUES (?, 1) \
568             ON CONFLICT (type_name) DO UPDATE \
569             SET current_level = current_level + 1 \
570             RETURNING current_level",
571            quote_ident(&self.table_name)
572        );
573        let id: i64 = self
574            .executor
575            .lock()?
576            .query_row(&sql, [entity], |row| row.get(0))?;
577        u64::try_from(id).map_err(|_| {
578            MutationExecutorError::Bind(format!("generated id {id} cannot be represented as u64"))
579        })
580    }
581}
582
583impl InternalIdGenerator for SqliteIdSpaceGenerator {
584    fn generate_id(&self, entity: &str) -> Result<u64, RuntimeError> {
585        self.next_id(entity)
586            .map_err(|err| RuntimeError::IdGeneration(err.to_string()))
587    }
588}
589
590fn quote_ident(ident: &str) -> String {
591    quote_identifier_if_needed(ident, '"')
592}
593
594/// Strip wrapping identifier quotes from a SQL identifier.
595///
596/// SQLite `PRAGMA table_info` returns bare column names (e.g. `description`),
597/// but generated `PropertyDescriptor::column_name` may carry quotes
598/// (e.g. `"description"`) when the name is a reserved keyword.  This helper
599/// normalises the column name so the two can be compared correctly during
600/// schema migration.
601fn strip_identifier_quotes(ident: &str) -> &str {
602    let bytes = ident.as_bytes();
603    if bytes.len() >= 2 {
604        let (first, last) = (bytes[0], bytes[bytes.len() - 1]);
605        if (first == b'"' && last == b'"')
606            || (first == b'`' && last == b'`')
607            || (first == b'[' && last == b']')
608        {
609            return &ident[1..ident.len() - 1];
610        }
611    }
612    ident
613}
614
615fn bind_values(values: &[Value]) -> Result<Vec<SqliteValue>, MutationExecutorError> {
616    values.iter().map(bind_sqlite_value).collect()
617}
618
619fn bind_sqlite_value(value: &Value) -> Result<SqliteValue, MutationExecutorError> {
620    match value {
621        Value::Null => Ok(SqliteValue::Null),
622        Value::Bool(v) => Ok(SqliteValue::Integer(i64::from(*v))),
623        Value::I64(v) => Ok(SqliteValue::Integer(*v)),
624        Value::U64(v) => i64::try_from(*v)
625            .map(SqliteValue::Integer)
626            .map_err(|_| MutationExecutorError::Bind(format!("u64 value {v} exceeds i64 range"))),
627        Value::F64(v) => Ok(SqliteValue::Real(*v)),
628        Value::Decimal(v) => Ok(SqliteValue::Text(format!("{SQLITE_DECIMAL_PREFIX}{v}"))),
629        Value::Text(v) => Ok(SqliteValue::Text(v.clone())),
630        Value::Json(v) => Ok(SqliteValue::Text(v.to_string())),
631        Value::Date(v) => Ok(SqliteValue::Text(v.format("%Y-%m-%d").to_string())),
632        Value::Timestamp(v) => Ok(SqliteValue::Text(v.to_rfc3339())),
633        Value::Object(_) => Err(MutationExecutorError::UnsupportedValue("object")),
634        Value::List(_) => Err(MutationExecutorError::UnsupportedValue("list")),
635    }
636}
637
638#[derive(Debug, Clone)]
639struct ColumnInfo {
640    name: String,
641    decl_type: Option<String>,
642}
643
644fn statement_columns(statement: &rusqlite::Statement<'_>) -> Vec<ColumnInfo> {
645    statement
646        .columns()
647        .into_iter()
648        .map(|column| ColumnInfo {
649            name: column.name().to_owned(),
650            decl_type: column.decl_type().map(|value| value.to_ascii_uppercase()),
651        })
652        .collect()
653}
654
655fn decode_sqlite_row(
656    row: &Row<'_>,
657    columns: &[ColumnInfo],
658) -> Result<Record, MutationExecutorError> {
659    let mut record = BTreeMap::new();
660    for (index, column) in columns.iter().enumerate() {
661        let value_ref = row.get_ref(index)?;
662        let value = match value_ref {
663            ValueRef::Null => Value::Null,
664            ValueRef::Integer(value) => decode_sqlite_integer(value, column),
665            ValueRef::Real(value) => Value::F64(value),
666            ValueRef::Text(value) => decode_sqlite_text(value, column)?,
667            ValueRef::Blob(_) => {
668                return Err(MutationExecutorError::UnsupportedColumnType(
669                    "BLOB".to_owned(),
670                ));
671            }
672        };
673        record.insert(column.name.clone(), value);
674    }
675    Ok(record)
676}
677
678fn decode_sqlite_integer(value: i64, column: &ColumnInfo) -> Value {
679    match column_decl_type(column).as_deref() {
680        Some("BOOLEAN") | Some("BOOL") => Value::Bool(value != 0),
681        _ => Value::I64(value),
682    }
683}
684
685fn decode_sqlite_text(value: &[u8], column: &ColumnInfo) -> Result<Value, MutationExecutorError> {
686    let value = std::str::from_utf8(value)
687        .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite text: {err}")))?;
688    if let Some(decimal) = value.strip_prefix(SQLITE_DECIMAL_PREFIX) {
689        return Decimal::from_str(decimal)
690            .map(Value::Decimal)
691            .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite decimal: {err}")));
692    }
693
694    match column_decl_type(column).as_deref() {
695        Some("NUMERIC") | Some("DECIMAL") => Decimal::from_str(value)
696            .map(Value::Decimal)
697            .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite decimal: {err}"))),
698        Some("JSON") => serde_json::from_str(value).map(Value::Json).map_err(|err| {
699            MutationExecutorError::Bind(format!("invalid sqlite json value: {err}"))
700        }),
701        Some("DATE") => NaiveDate::parse_from_str(value, "%Y-%m-%d")
702            .map(Value::Date)
703            .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite date: {err}"))),
704        Some("TIMESTAMP") | Some("DATETIME") => parse_sqlite_timestamp(value),
705        _ => infer_sqlite_text(value),
706    }
707}
708
709fn infer_sqlite_text(value: &str) -> Result<Value, MutationExecutorError> {
710    if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
711        return Ok(Value::Date(date));
712    }
713    if let Ok(timestamp) = DateTime::parse_from_rfc3339(value) {
714        return Ok(Value::Timestamp(timestamp.with_timezone(&Utc)));
715    }
716    if let Ok(timestamp) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
717        return Ok(Value::Timestamp(Utc.from_utc_datetime(&timestamp)));
718    }
719    Ok(Value::Text(value.to_owned()))
720}
721
722fn parse_sqlite_timestamp(value: &str) -> Result<Value, MutationExecutorError> {
723    if let Ok(timestamp) = DateTime::parse_from_rfc3339(value) {
724        return Ok(Value::Timestamp(timestamp.with_timezone(&Utc)));
725    }
726    NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S")
727        .map(|timestamp| Value::Timestamp(Utc.from_utc_datetime(&timestamp)))
728        .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite timestamp: {err}")))
729}
730
731fn column_decl_type(column: &ColumnInfo) -> Option<String> {
732    column
733        .decl_type
734        .as_ref()
735        .map(|value| value.split('(').next().unwrap_or(value).trim().to_owned())
736}
737
738#[cfg(test)]
739mod tests {
740    use super::*;
741    use teaql_core::{DeleteCommand, RecoverCommand};
742    use teaql_runtime::InMemoryMetadataStore;
743
744    fn entity() -> EntityDescriptor {
745        EntityDescriptor::new("Order")
746            .table_name("orders")
747            .property(
748                PropertyDescriptor::new("id", DataType::U64)
749                    .column_name("id")
750                    .id()
751                    .not_null(),
752            )
753            .property(
754                PropertyDescriptor::new("version", DataType::I64)
755                    .column_name("version")
756                    .version()
757                    .not_null(),
758            )
759            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
760    }
761
762    #[test]
763    fn sqlite_dialect_compiles_mutations_and_schema() {
764        let insert = SqliteDialect
765            .compile_insert(
766                &entity(),
767                &InsertCommand::new("Order")
768                    .value("id", 1_u64)
769                    .value("name", "A"),
770            )
771            .unwrap();
772        assert_eq!(insert.sql, "INSERT INTO orders (id, name) VALUES (?, ?)");
773
774        let update = SqliteDialect
775            .compile_update(
776                &entity(),
777                &UpdateCommand::new("Order", 1_u64)
778                    .expected_version(3)
779                    .value("name", "B"),
780            )
781            .unwrap();
782        assert_eq!(
783            update.sql,
784            "UPDATE orders SET name = ?, version = ? WHERE id = ? AND version = ?"
785        );
786
787        let delete = SqliteDialect
788            .compile_delete(
789                &entity(),
790                &DeleteCommand::new("Order", 1_u64).expected_version(3),
791            )
792            .unwrap();
793        let recover = SqliteDialect
794            .compile_recover(&entity(), &RecoverCommand::new("Order", 1_u64, -4))
795            .unwrap();
796        assert_eq!(
797            delete.sql,
798            "UPDATE orders SET version = ? WHERE id = ? AND version = ?"
799        );
800        assert_eq!(
801            recover.sql,
802            "UPDATE orders SET version = ? WHERE id = ? AND version = ?"
803        );
804
805        let create = SqliteDialect.compile_create_table(&entity()).unwrap();
806        assert_eq!(
807            create,
808            "CREATE TABLE IF NOT EXISTS orders (id INTEGER PRIMARY KEY NOT NULL, version INTEGER NOT NULL, name TEXT)"
809        );
810    }
811
812    #[test]
813    fn sqlite_executor_ensures_schema_and_roundtrips_rows() {
814        let executor = SqliteMutationExecutor::from_connection(Connection::open_in_memory().unwrap());
815        let entity = entity();
816        let mut ctx = UserContext::new()
817            .with_metadata(InMemoryMetadataStore::new().with_entity(entity.clone()));
818
819        ctx.use_sqlite_provider(executor.clone());
820        ensure_sqlite_schema_for(&ctx).unwrap();
821
822        let insert = SqliteDialect
823            .compile_insert(
824                &entity,
825                &InsertCommand::new("Order")
826                    .value("id", 1_u64)
827                    .value("version", 1_i64)
828                    .value("name", "draft"),
829            )
830            .unwrap();
831        assert_eq!(executor.execute(&insert).unwrap(), 1);
832
833        let select = SqliteDialect
834            .compile_select(
835                &entity,
836                &SelectQuery::new("Order")
837                    .filter(Expr::eq("id", 1_u64))
838                    .order_asc("id"),
839            )
840            .unwrap();
841        let rows = executor.fetch_all(&select).unwrap();
842        assert_eq!(rows.len(), 1);
843        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
844        assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
845        assert_eq!(rows[0].get("name"), Some(&Value::Text("draft".to_owned())));
846    }
847
848    #[test]
849    fn sqlite_executor_parses_json_only_for_json_columns() {
850        let executor = SqliteMutationExecutor::from_connection(Connection::open_in_memory().unwrap());
851
852        executor
853            .execute(&CompiledQuery {
854                sql: "CREATE TABLE payloads (text_payload TEXT, json_payload JSON)".to_owned(),
855                params: Vec::new(),
856                comment: None,
857            })
858            .unwrap();
859        executor
860            .execute(&CompiledQuery {
861                sql: "INSERT INTO payloads (text_payload, json_payload) VALUES (?, ?)".to_owned(),
862                params: vec![
863                    Value::Text("{\"active\":true}".to_owned()),
864                    Value::Json(serde_json::json!({"active": true})),
865                ],
866                comment: None,
867            })
868            .unwrap();
869
870        let rows = executor
871            .fetch_all(&CompiledQuery {
872                sql: "SELECT text_payload, json_payload FROM payloads".to_owned(),
873                params: Vec::new(),
874                comment: None,
875            })
876            .unwrap();
877
878        assert_eq!(
879            rows[0].get("text_payload"),
880            Some(&Value::Text("{\"active\":true}".to_owned()))
881        );
882        assert_eq!(
883            rows[0].get("json_payload"),
884            Some(&Value::Json(serde_json::json!({"active": true})))
885        );
886    }
887
888    #[test]
889    fn sqlite_id_space_generator_increments_ids() {
890        let executor = SqliteMutationExecutor::from_connection(Connection::open_in_memory().unwrap());
891        let generator = SqliteIdSpaceGenerator::from_executor(executor);
892        assert_eq!(generator.next_id("Order").unwrap(), 1);
893        assert_eq!(generator.next_id("Order").unwrap(), 2);
894    }
895
896    #[test]
897    fn sqlite_fetch_stream_returns_chunked_rows() {
898        let executor = SqliteMutationExecutor::new(Connection::open_in_memory().unwrap());
899        let entity = entity();
900
901        // Create table and insert 25 rows
902        executor.execute(&CompiledQuery {
903            sql: "CREATE TABLE orders (id INTEGER PRIMARY KEY, version INTEGER, name TEXT)".to_owned(),
904            params: Vec::new(),
905            comment: None,
906        }).unwrap();
907
908        for i in 1..=25 {
909            let insert = SqliteDialect
910                .compile_insert(
911                    &entity,
912                    &InsertCommand::new("Order")
913                        .value("id", i as u64)
914                        .value("version", 1_i64)
915                        .value("name", format!("order-{i}")),
916                )
917                .unwrap();
918            executor.execute(&insert).unwrap();
919        }
920
921        // Stream with chunk_size = 10
922        let query = SelectQuery::new("Order")
923            .filter(Expr::gt("version", 0_i64))
924            .order_asc("id")
925            .stream(10);
926
927        let compiled = SqliteDialect
928            .compile_select(&entity, &query)
929            .unwrap();
930
931        let chunks = executor.fetch_stream(&compiled, 10).unwrap();
932
933        // 25 rows / 10 per chunk = 3 chunks
934        assert_eq!(chunks.len(), 3);
935        assert_eq!(chunks[0].rows.len(), 10);
936        assert_eq!(chunks[0].chunk_index, 0);
937        assert!(!chunks[0].is_last);
938
939        assert_eq!(chunks[1].rows.len(), 10);
940        assert_eq!(chunks[1].chunk_index, 1);
941        assert!(!chunks[1].is_last);
942
943        assert_eq!(chunks[2].rows.len(), 5);
944        assert_eq!(chunks[2].chunk_index, 2);
945        assert!(chunks[2].is_last);
946
947        // Verify first and last row
948        assert_eq!(chunks[0].rows[0].get("name"), Some(&Value::Text("order-1".to_owned())));
949        assert_eq!(chunks[2].rows[4].get("name"), Some(&Value::Text("order-25".to_owned())));
950    }
951
952    #[test]
953    fn sqlite_fetch_stream_handles_empty_result() {
954        let executor = SqliteMutationExecutor::new(Connection::open_in_memory().unwrap());
955
956        executor.execute(&CompiledQuery {
957            sql: "CREATE TABLE orders (id INTEGER PRIMARY KEY, version INTEGER, name TEXT)".to_owned(),
958            params: Vec::new(),
959            comment: None,
960        }).unwrap();
961
962        let entity = entity();
963        let query = SelectQuery::new("Order")
964            .filter(Expr::gt("version", 0_i64))
965            .stream(10);
966
967        let compiled = SqliteDialect
968            .compile_select(&entity, &query)
969            .unwrap();
970
971        let chunks = executor.fetch_stream(&compiled, 10).unwrap();
972
973        // Empty result = 1 chunk with 0 rows, marked as last
974        assert_eq!(chunks.len(), 1);
975        assert_eq!(chunks[0].rows.len(), 0);
976        assert!(chunks[0].is_last);
977    }
978
979    #[test]
980    fn sqlite_fetch_stream_exact_chunk_boundary() {
981        let executor = SqliteMutationExecutor::new(Connection::open_in_memory().unwrap());
982        let entity = entity();
983
984        executor.execute(&CompiledQuery {
985            sql: "CREATE TABLE orders (id INTEGER PRIMARY KEY, version INTEGER, name TEXT)".to_owned(),
986            params: Vec::new(),
987            comment: None,
988        }).unwrap();
989
990        // Insert exactly 20 rows
991        for i in 1..=20 {
992            let insert = SqliteDialect
993                .compile_insert(
994                    &entity,
995                    &InsertCommand::new("Order")
996                        .value("id", i as u64)
997                        .value("version", 1_i64)
998                        .value("name", format!("order-{i}")),
999                )
1000                .unwrap();
1001            executor.execute(&insert).unwrap();
1002        }
1003
1004        let query = SelectQuery::new("Order")
1005            .filter(Expr::gt("version", 0_i64))
1006            .order_asc("id")
1007            .stream(10);
1008
1009        let compiled = SqliteDialect
1010            .compile_select(&entity, &query)
1011            .unwrap();
1012
1013        let chunks = executor.fetch_stream(&compiled, 10).unwrap();
1014
1015        // 20 rows / 10 per chunk = 2 full chunks + 1 empty final chunk
1016        assert_eq!(chunks.len(), 3);
1017        assert_eq!(chunks[0].rows.len(), 10);
1018        assert!(!chunks[0].is_last);
1019        assert_eq!(chunks[1].rows.len(), 10);
1020        assert!(!chunks[1].is_last);
1021        assert_eq!(chunks[2].rows.len(), 0);
1022        assert!(chunks[2].is_last);
1023    }
1024}