1use std::collections::{BTreeMap, BTreeSet};
2use std::future::Future;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
8use rusqlite::types::{Value as SqliteValue, ValueRef};
9use rusqlite::{Connection, Row, params_from_iter};
10use rust_decimal::Decimal;
11use teaql_core::{
12 DataType, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record, SelectQuery,
13 UpdateCommand, Value,
14};
15use teaql_runtime::{
16 RawAuditEvent, GraphNode, InternalIdGenerator,
17 RuntimeError, SchemaProvider, UserContext,
18};
19use teaql_sql::{
20 CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect,
21 SqlTransport, quote_identifier_if_needed,
22};
23
24pub const DEFAULT_ID_SPACE_TABLE: &str = "teaql_id_space";
25
26const SQLITE_DECIMAL_PREFIX: &str = "__teaql_decimal__:";
27
28#[derive(Debug, Default, Clone, Copy)]
29pub struct SqliteDialect;
30
31impl SqlDialect for SqliteDialect {
32 fn kind(&self) -> DatabaseKind {
33 DatabaseKind::Sqlite
34 }
35
36 fn quote_ident(&self, ident: &str) -> String {
37 quote_ident(ident)
38 }
39
40 fn placeholder(&self, _index: usize) -> String {
41 "?".to_owned()
42 }
43
44 fn schema_type_sql(
45 &self,
46 data_type: DataType,
47 property: &PropertyDescriptor,
48 ) -> Result<&'static str, SqlCompileError> {
49 match data_type {
50 DataType::Bool => Ok("INTEGER"),
51 DataType::I64 | DataType::U64 if property.is_id => Ok("INTEGER"),
52 DataType::I64 | DataType::U64 => Ok("INTEGER"),
53 DataType::F64 => Ok("REAL"),
54 DataType::Decimal => Ok("NUMERIC"),
55 DataType::Text => Ok("TEXT"),
56 DataType::Json => Ok("JSON"),
57 DataType::Date => Ok("DATE"),
58 DataType::Timestamp => Ok("TIMESTAMP"),
59 }
60 }
61
62 fn compile_add_column(
63 &self,
64 entity: &EntityDescriptor,
65 property: &PropertyDescriptor,
66 ) -> Result<String, SqlCompileError> {
67 let def = self.column_definition_sql(property)?;
71 let def_without_not_null = def.replace(" NOT NULL", "");
72
73 Ok(format!(
74 "ALTER TABLE {} ADD COLUMN {}",
75 self.quote_ident(&entity.table_name),
76 def_without_not_null
77 ))
78 }
79}
80
81#[derive(Debug)]
82pub enum MutationExecutorError {
83 Sqlite(rusqlite::Error),
84 SqlCompile(SqlCompileError),
85 UnsupportedValue(&'static str),
86 UnsupportedColumnType(String),
87 Bind(String),
88 Lock(String),
89}
90
91impl std::fmt::Display for MutationExecutorError {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 match self {
94 Self::Sqlite(err) => err.fmt(f),
95 Self::SqlCompile(err) => err.fmt(f),
96 Self::UnsupportedValue(kind) => {
97 write!(
98 f,
99 "unsupported rusqlite bind value for mutation executor: {kind}"
100 )
101 }
102 Self::UnsupportedColumnType(kind) => {
103 write!(
104 f,
105 "unsupported rusqlite column type for record decoding: {kind}"
106 )
107 }
108 Self::Bind(message) => write!(f, "rusqlite bind error: {message}"),
109 Self::Lock(message) => write!(f, "rusqlite connection lock error: {message}"),
110 }
111 }
112}
113
114impl std::error::Error for MutationExecutorError {}
115
116impl From<rusqlite::Error> for MutationExecutorError {
117 fn from(value: rusqlite::Error) -> Self {
118 Self::Sqlite(value)
119 }
120}
121
122impl From<SqlCompileError> for MutationExecutorError {
123 fn from(value: SqlCompileError) -> Self {
124 Self::SqlCompile(value)
125 }
126}
127
128#[derive(Clone)]
129pub struct SqliteMutationExecutor {
130 connection: Arc<Mutex<Connection>>,
131}
132
133impl SqliteMutationExecutor {
134 pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
135 Self { connection }
136 }
137
138 pub fn from_connection(connection: Connection) -> Self {
139 Self::new(Arc::new(Mutex::new(connection)))
140 }
141
142 pub fn connection(&self) -> Arc<Mutex<Connection>> {
143 Arc::clone(&self.connection)
144 }
145
146 pub fn ensure_schema(
147 &self,
148 dialect: &SqliteDialect,
149 entities: &[&EntityDescriptor],
150 ) -> Result<(), MutationExecutorError> {
151 self.ensure_id_space_table(DEFAULT_ID_SPACE_TABLE)?;
152
153 for entity in entities {
154 if !self.table_exists(&entity.table_name)? {
155 let sql = dialect.compile_create_table(entity)?;
156 self.lock()?.execute(&sql, [])?;
157 continue;
158 }
159
160 let existing_columns = self.table_columns(&entity.table_name)?;
161 for property in &entity.properties {
162 let bare_column = strip_identifier_quotes(&property.column_name).to_lowercase();
163 if existing_columns.contains(&bare_column) {
164 continue;
165 }
166 let sql = dialect.compile_add_column(entity, property)?;
167 self.lock()?.execute(&sql, [])?;
168 }
169 }
170 Ok(())
171 }
172
173 pub fn ensure_id_space_table(&self, table_name: &str) -> Result<(), MutationExecutorError> {
174 let sql = format!(
175 "CREATE TABLE IF NOT EXISTS {} (type_name VARCHAR(100) PRIMARY KEY, current_level BIGINT NOT NULL)",
176 quote_ident(table_name)
177 );
178 self.lock()?.execute(&sql, [])?;
179 Ok(())
180 }
181
182 pub fn begin_transaction(&self) -> Result<(), MutationExecutorError> {
183 self.lock()?.execute("BEGIN IMMEDIATE", [])?;
184 Ok(())
185 }
186
187 pub fn commit_transaction(&self) -> Result<(), MutationExecutorError> {
188 self.lock()?.execute("COMMIT", [])?;
189 Ok(())
190 }
191
192 pub fn rollback_transaction(&self) -> Result<(), MutationExecutorError> {
193 self.lock()?.execute("ROLLBACK", [])?;
194 Ok(())
195 }
196
197 pub fn execute(&self, query: &CompiledQuery) -> Result<u64, MutationExecutorError> {
198 let params = bind_values(&query.params)?;
199 let rows = self
200 .lock()?
201 .execute(&query.sql_with_comment(), params_from_iter(params.iter()))?;
202 Ok(rows as u64)
203 }
204
205 pub fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, MutationExecutorError> {
206 let params = bind_values(&query.params)?;
207 let connection = self.lock()?;
208 let mut statement = connection.prepare(&query.sql_with_comment())?;
209 let columns = statement_columns(&statement);
210 let mut rows = statement.query(params_from_iter(params.iter()))?;
211 let mut records = Vec::new();
212 while let Some(row) = rows.next()? {
213 records.push(decode_sqlite_row(row, &columns)?);
214 }
215 Ok(records)
216 }
217
218 pub fn fetch_stream(
221 &self,
222 query: &CompiledQuery,
223 chunk_size: usize,
224 ) -> Result<Vec<teaql_data_service::StreamChunk>, MutationExecutorError> {
225 let params = bind_values(&query.params)?;
226 let connection = self.lock()?;
227 let mut statement = connection.prepare(&query.sql_with_comment())?;
228 let columns = statement_columns(&statement);
229 let mut rows = statement.query(params_from_iter(params.iter()))?;
230
231 let mut chunks = Vec::new();
232 let mut current_chunk = Vec::new();
233 let mut chunk_index = 0;
234
235 while let Some(row) = rows.next()? {
236 current_chunk.push(decode_sqlite_row(row, &columns)?);
237 if current_chunk.len() >= chunk_size {
238 chunks.push(teaql_data_service::StreamChunk {
239 rows: current_chunk,
240 chunk_index,
241 is_last: false,
242 });
243 current_chunk = Vec::new();
244 chunk_index += 1;
245 }
246 }
247
248 chunks.push(teaql_data_service::StreamChunk {
250 rows: current_chunk,
251 chunk_index,
252 is_last: true,
253 });
254
255 Ok(chunks)
256 }
257
258 pub fn table_exists(&self, table_name: &str) -> Result<bool, MutationExecutorError> {
259 let exists: i64 = self.lock()?.query_row(
260 "SELECT COUNT(1) FROM sqlite_master WHERE type = 'table' AND name = ?",
261 [table_name],
262 |row| row.get(0),
263 )?;
264 Ok(exists > 0)
265 }
266
267 pub fn table_columns(&self, table_name: &str) -> Result<BTreeSet<String>, MutationExecutorError> {
268 let pragma_sql = format!("PRAGMA table_info({})", quote_ident(table_name));
269 let connection = self.lock()?;
270 let mut statement = connection.prepare(&pragma_sql)?;
271 let rows = statement.query_map([], |row| row.get::<_, String>("name"))?;
272 let mut columns = BTreeSet::new();
273 for row in rows {
274 columns.insert(row?.to_lowercase());
275 }
276 Ok(columns)
277 }
278
279 fn lock(&self) -> Result<MutexGuard<'_, Connection>, MutationExecutorError> {
280 self.connection
281 .lock()
282 .map_err(|err| MutationExecutorError::Lock(err.to_string()))
283 }
284}
285
286
287impl teaql_data_service::DataServiceExecutor for SqliteMutationExecutor {
288 type Error = MutationExecutorError;
289
290 fn capabilities(&self) -> teaql_data_service::DataServiceCapabilities {
291 teaql_data_service::DataServiceCapabilities {
292 query: true,
293 mutation: true,
294 transaction: true,
295 schema: true,
296 id_generation: true,
297 ..Default::default()
298 }
299 }
300}
301
302impl SqlTransport for SqliteMutationExecutor {
303 type Error = MutationExecutorError;
304
305 async fn fetch_all_sql(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
306 SqliteMutationExecutor::fetch_all(self, query)
307 }
308
309 async fn execute_sql(&self, query: &CompiledQuery) -> Result<u64, Self::Error> {
310 SqliteMutationExecutor::execute(self, query)
311 }
312}
313
314impl teaql_data_service::StreamQueryExecutor for SqliteMutationExecutor {
315 async fn query_stream(
316 &self,
317 request: teaql_data_service::QueryRequest,
318 chunk_size: usize,
319 ) -> Result<Vec<teaql_data_service::StreamChunk>, Self::Error> {
320 let dialect = SqliteDialect;
321 let entity_desc = teaql_core::EntityDescriptor::new(&request.query.entity);
323 let compiled = dialect.compile_select(&entity_desc, &request.query)
324 .map_err(MutationExecutorError::SqlCompile)?;
325 SqliteMutationExecutor::fetch_stream(self, &compiled, chunk_size)
326 }
327}
328
329impl teaql_sql::SqlTransaction for SqliteMutationExecutor {
330 type Error = MutationExecutorError;
331
332 async fn commit_sql(self) -> Result<(), Self::Error> {
333 self.commit_transaction()
334 }
335
336 async fn rollback_sql(self) -> Result<(), Self::Error> {
337 self.rollback_transaction()
338 }
339}
340
341impl teaql_sql::SqlTransactionTransport for SqliteMutationExecutor {
342 type Tx<'a> = Self where Self: 'a;
343
344 async fn begin_sql(&self) -> Result<Self::Tx<'_>, Self::Error> {
345 self.begin_transaction()?;
346 Ok(self.clone())
347 }
348}
349
350
351
352fn initial_graph_exists_sqlite(
353 executor: &SqliteMutationExecutor,
354 dialect: &SqliteDialect,
355 entity: &EntityDescriptor,
356 graph: &GraphNode,
357) -> Result<bool, MutationExecutorError> {
358 let Some(id) = graph.values.get("id") else {
359 return Ok(false);
360 };
361 let query = dialect.compile_select(
362 entity,
363 &SelectQuery::new(&graph.entity)
364 .project("id")
365 .filter(Expr::eq("id", id.clone()))
366 .limit(1),
367 )?;
368 Ok(!executor.fetch_all(&query)?.is_empty())
369}
370
371fn compile_initial_graph_insert(
372 dialect: &impl SqlDialect,
373 entity: &EntityDescriptor,
374 graph: &GraphNode,
375) -> Result<CompiledQuery, MutationExecutorError> {
376 let mut command = InsertCommand::new(&graph.entity);
377 for (field, value) in &graph.values {
378 command = command.value(field.clone(), value.clone());
379 }
380 dialect.compile_insert(entity, &command).map_err(Into::into)
381}
382
383fn compile_initial_graph_update(
384 dialect: &impl SqlDialect,
385 entity: &EntityDescriptor,
386 graph: &GraphNode,
387) -> Result<Option<CompiledQuery>, MutationExecutorError> {
388 let Some(id) = graph.values.get("id") else {
389 return Ok(None);
390 };
391 let mut command = UpdateCommand::new(&graph.entity, id.clone());
392 for (field, value) in &graph.values {
393 if field == "id" {
394 continue;
395 }
396 command = command.value(field.clone(), value.clone());
397 }
398 match dialect.compile_update(entity, &command) {
399 Ok(query) => Ok(Some(query)),
400 Err(SqlCompileError::EmptyMutation(_)) => Ok(None),
401 Err(err) => Err(err.into()),
402 }
403}
404
405pub trait SqliteSchemaExt {
406 fn ensure_sqlite_schema(
407 &self,
408 ) -> Pin<Box<dyn Future<Output = Result<(), MutationExecutorError>> + Send + '_>>;
409}
410
411pub fn ensure_sqlite_schema_for(ctx: &UserContext) -> Result<(), MutationExecutorError> {
412 let dialect = ctx.get_resource::<SqliteDialect>().ok_or_else(|| {
413 MutationExecutorError::Bind("missing typed resource: SqliteDialect".to_owned())
414 })?;
415 let executor = ctx
416 .get_resource::<SqliteMutationExecutor>()
417 .ok_or_else(|| {
418 MutationExecutorError::Bind(
419 "missing typed resource: SqliteMutationExecutor".to_owned(),
420 )
421 })?;
422
423 let entities = ctx.all_entities();
424
425 executor.ensure_id_space_table(DEFAULT_ID_SPACE_TABLE)?;
427
428 for entity in &entities {
430 let field_count = entity.properties.len();
431 if !executor.table_exists(&entity.table_name)? {
432 let sql = dialect.compile_create_table(entity)?;
434 executor.lock()?.execute(&sql, [])?;
435 let _ = ctx.send_event(RawAuditEvent::schema_created(
436 &entity.name,
437 &entity.table_name,
438 field_count,
439 ));
440 } else {
441 let existing_columns = executor.table_columns(&entity.table_name)?;
443 let mut fields_added = 0;
444 for property in &entity.properties {
445 let bare_column = strip_identifier_quotes(&property.column_name).to_lowercase();
446 if existing_columns.contains(&bare_column) {
447 continue;
448 }
449 let sql = dialect.compile_add_column(entity, property)?;
450 executor.lock()?.execute(&sql, [])?;
451 let _ = ctx.send_event(RawAuditEvent::field_added(
452 &entity.name,
453 &entity.table_name,
454 &property.column_name,
455 ));
456 fields_added += 1;
457 }
458 let _ = ctx.send_event(RawAuditEvent::schema_verified(
459 &entity.name,
460 &entity.table_name,
461 field_count,
462 ));
463 let _ = fields_added; }
465 }
466
467 let mut seed_counts: BTreeMap<String, (usize, usize)> = BTreeMap::new(); for graph in ctx.initial_graphs() {
470 let entity = ctx.entity(&graph.entity).ok_or_else(|| {
471 MutationExecutorError::Bind(format!("missing entity: {}", graph.entity))
472 })?;
473 let counts = seed_counts.entry(graph.entity.clone()).or_insert((0, 0));
474 if initial_graph_exists_sqlite(executor, dialect, entity, graph)? {
475 if let Some(query) = compile_initial_graph_update(dialect, entity, graph)? {
476 executor.execute(&query)?;
477 }
478 counts.1 += 1; } else {
480 let query = compile_initial_graph_insert(dialect, entity, graph)?;
481 executor.execute(&query)?;
482 counts.0 += 1; }
484 }
485
486 for (entity_name, (inserted, updated)) in &seed_counts {
488 let entity = ctx.entity(entity_name).ok_or_else(|| {
489 MutationExecutorError::Bind(format!("missing entity: {}", entity_name))
490 })?;
491 let _ = ctx.send_event(RawAuditEvent::data_seeded(
492 entity_name,
493 &entity.table_name,
494 *inserted,
495 *updated,
496 ));
497 }
498
499 Ok(())
500}
501
502impl SqliteSchemaExt for UserContext {
503 fn ensure_sqlite_schema(
504 &self,
505 ) -> Pin<Box<dyn Future<Output = Result<(), MutationExecutorError>> + Send + '_>> {
506 Box::pin(async move { ensure_sqlite_schema_for(self) })
507 }
508}
509
510#[derive(Debug, Default, Clone, Copy)]
511pub struct SqliteSchemaProvider;
512
513impl SchemaProvider for SqliteSchemaProvider {
514 fn ensure_schema<'a>(
515 &'a self,
516 ctx: &'a UserContext,
517 ) -> Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>> {
518 Box::pin(async move {
519 ensure_sqlite_schema_for(ctx).map_err(|err| RuntimeError::Schema(err.to_string()))
520 })
521 }
522}
523
524pub trait SqliteProviderExt {
525 fn use_sqlite_provider(&mut self, executor: SqliteMutationExecutor) -> &mut Self;
526}
527
528impl SqliteProviderExt for UserContext {
529 fn use_sqlite_provider(&mut self, executor: SqliteMutationExecutor) -> &mut Self {
530 self.insert_resource(SqliteDialect);
531 self.insert_resource(executor);
532 self.set_schema_provider(SqliteSchemaProvider);
533 self
534 }
535}
536
537#[derive(Clone)]
538pub struct SqliteIdSpaceGenerator {
539 executor: SqliteMutationExecutor,
540 table_name: String,
541}
542
543impl SqliteIdSpaceGenerator {
544 pub fn new(connection: Connection) -> Self {
545 Self::from_executor(SqliteMutationExecutor::from_connection(connection))
546 }
547
548 pub fn from_executor(executor: SqliteMutationExecutor) -> Self {
549 Self {
550 executor,
551 table_name: DEFAULT_ID_SPACE_TABLE.to_owned(),
552 }
553 }
554
555 pub fn with_table_name(mut self, table_name: impl Into<String>) -> Self {
556 self.table_name = table_name.into();
557 self
558 }
559
560 pub fn ensure_table(&self) -> Result<(), MutationExecutorError> {
561 self.executor.ensure_id_space_table(&self.table_name)
562 }
563
564 pub fn next_id(&self, entity: &str) -> Result<u64, MutationExecutorError> {
565 self.ensure_table()?;
566 let sql = format!(
567 "INSERT INTO {} (type_name, current_level) VALUES (?, 1) \
568 ON CONFLICT (type_name) DO UPDATE \
569 SET current_level = current_level + 1 \
570 RETURNING current_level",
571 quote_ident(&self.table_name)
572 );
573 let id: i64 = self
574 .executor
575 .lock()?
576 .query_row(&sql, [entity], |row| row.get(0))?;
577 u64::try_from(id).map_err(|_| {
578 MutationExecutorError::Bind(format!("generated id {id} cannot be represented as u64"))
579 })
580 }
581}
582
583impl InternalIdGenerator for SqliteIdSpaceGenerator {
584 fn generate_id(&self, entity: &str) -> Result<u64, RuntimeError> {
585 self.next_id(entity)
586 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))
587 }
588}
589
590fn quote_ident(ident: &str) -> String {
591 quote_identifier_if_needed(ident, '"')
592}
593
594fn strip_identifier_quotes(ident: &str) -> &str {
602 let bytes = ident.as_bytes();
603 if bytes.len() >= 2 {
604 let (first, last) = (bytes[0], bytes[bytes.len() - 1]);
605 if (first == b'"' && last == b'"')
606 || (first == b'`' && last == b'`')
607 || (first == b'[' && last == b']')
608 {
609 return &ident[1..ident.len() - 1];
610 }
611 }
612 ident
613}
614
615fn bind_values(values: &[Value]) -> Result<Vec<SqliteValue>, MutationExecutorError> {
616 values.iter().map(bind_sqlite_value).collect()
617}
618
619fn bind_sqlite_value(value: &Value) -> Result<SqliteValue, MutationExecutorError> {
620 match value {
621 Value::Null => Ok(SqliteValue::Null),
622 Value::Bool(v) => Ok(SqliteValue::Integer(i64::from(*v))),
623 Value::I64(v) => Ok(SqliteValue::Integer(*v)),
624 Value::U64(v) => i64::try_from(*v)
625 .map(SqliteValue::Integer)
626 .map_err(|_| MutationExecutorError::Bind(format!("u64 value {v} exceeds i64 range"))),
627 Value::F64(v) => Ok(SqliteValue::Real(*v)),
628 Value::Decimal(v) => Ok(SqliteValue::Text(format!("{SQLITE_DECIMAL_PREFIX}{v}"))),
629 Value::Text(v) => Ok(SqliteValue::Text(v.clone())),
630 Value::Json(v) => Ok(SqliteValue::Text(v.to_string())),
631 Value::Date(v) => Ok(SqliteValue::Text(v.format("%Y-%m-%d").to_string())),
632 Value::Timestamp(v) => Ok(SqliteValue::Text(v.to_rfc3339())),
633 Value::Object(_) => Err(MutationExecutorError::UnsupportedValue("object")),
634 Value::List(_) => Err(MutationExecutorError::UnsupportedValue("list")),
635 }
636}
637
638#[derive(Debug, Clone)]
639struct ColumnInfo {
640 name: String,
641 decl_type: Option<String>,
642}
643
644fn statement_columns(statement: &rusqlite::Statement<'_>) -> Vec<ColumnInfo> {
645 statement
646 .columns()
647 .into_iter()
648 .map(|column| ColumnInfo {
649 name: column.name().to_owned(),
650 decl_type: column.decl_type().map(|value| value.to_ascii_uppercase()),
651 })
652 .collect()
653}
654
655fn decode_sqlite_row(
656 row: &Row<'_>,
657 columns: &[ColumnInfo],
658) -> Result<Record, MutationExecutorError> {
659 let mut record = BTreeMap::new();
660 for (index, column) in columns.iter().enumerate() {
661 let value_ref = row.get_ref(index)?;
662 let value = match value_ref {
663 ValueRef::Null => Value::Null,
664 ValueRef::Integer(value) => decode_sqlite_integer(value, column),
665 ValueRef::Real(value) => Value::F64(value),
666 ValueRef::Text(value) => decode_sqlite_text(value, column)?,
667 ValueRef::Blob(_) => {
668 return Err(MutationExecutorError::UnsupportedColumnType(
669 "BLOB".to_owned(),
670 ));
671 }
672 };
673 record.insert(column.name.clone(), value);
674 }
675 Ok(record)
676}
677
678fn decode_sqlite_integer(value: i64, column: &ColumnInfo) -> Value {
679 match column_decl_type(column).as_deref() {
680 Some("BOOLEAN") | Some("BOOL") => Value::Bool(value != 0),
681 _ => Value::I64(value),
682 }
683}
684
685fn decode_sqlite_text(value: &[u8], column: &ColumnInfo) -> Result<Value, MutationExecutorError> {
686 let value = std::str::from_utf8(value)
687 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite text: {err}")))?;
688 if let Some(decimal) = value.strip_prefix(SQLITE_DECIMAL_PREFIX) {
689 return Decimal::from_str(decimal)
690 .map(Value::Decimal)
691 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite decimal: {err}")));
692 }
693
694 match column_decl_type(column).as_deref() {
695 Some("NUMERIC") | Some("DECIMAL") => Decimal::from_str(value)
696 .map(Value::Decimal)
697 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite decimal: {err}"))),
698 Some("JSON") => serde_json::from_str(value).map(Value::Json).map_err(|err| {
699 MutationExecutorError::Bind(format!("invalid sqlite json value: {err}"))
700 }),
701 Some("DATE") => NaiveDate::parse_from_str(value, "%Y-%m-%d")
702 .map(Value::Date)
703 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite date: {err}"))),
704 Some("TIMESTAMP") | Some("DATETIME") => parse_sqlite_timestamp(value),
705 _ => infer_sqlite_text(value),
706 }
707}
708
709fn infer_sqlite_text(value: &str) -> Result<Value, MutationExecutorError> {
710 if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
711 return Ok(Value::Date(date));
712 }
713 if let Ok(timestamp) = DateTime::parse_from_rfc3339(value) {
714 return Ok(Value::Timestamp(timestamp.with_timezone(&Utc)));
715 }
716 if let Ok(timestamp) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
717 return Ok(Value::Timestamp(Utc.from_utc_datetime(×tamp)));
718 }
719 Ok(Value::Text(value.to_owned()))
720}
721
722fn parse_sqlite_timestamp(value: &str) -> Result<Value, MutationExecutorError> {
723 if let Ok(timestamp) = DateTime::parse_from_rfc3339(value) {
724 return Ok(Value::Timestamp(timestamp.with_timezone(&Utc)));
725 }
726 NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S")
727 .map(|timestamp| Value::Timestamp(Utc.from_utc_datetime(×tamp)))
728 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite timestamp: {err}")))
729}
730
731fn column_decl_type(column: &ColumnInfo) -> Option<String> {
732 column
733 .decl_type
734 .as_ref()
735 .map(|value| value.split('(').next().unwrap_or(value).trim().to_owned())
736}
737
738#[cfg(test)]
739mod tests {
740 use super::*;
741 use teaql_core::{DeleteCommand, RecoverCommand};
742 use teaql_runtime::InMemoryMetadataStore;
743
744 fn entity() -> EntityDescriptor {
745 EntityDescriptor::new("Order")
746 .table_name("orders")
747 .property(
748 PropertyDescriptor::new("id", DataType::U64)
749 .column_name("id")
750 .id()
751 .not_null(),
752 )
753 .property(
754 PropertyDescriptor::new("version", DataType::I64)
755 .column_name("version")
756 .version()
757 .not_null(),
758 )
759 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
760 }
761
762 #[test]
763 fn sqlite_dialect_compiles_mutations_and_schema() {
764 let insert = SqliteDialect
765 .compile_insert(
766 &entity(),
767 &InsertCommand::new("Order")
768 .value("id", 1_u64)
769 .value("name", "A"),
770 )
771 .unwrap();
772 assert_eq!(insert.sql, "INSERT INTO orders (id, name) VALUES (?, ?)");
773
774 let update = SqliteDialect
775 .compile_update(
776 &entity(),
777 &UpdateCommand::new("Order", 1_u64)
778 .expected_version(3)
779 .value("name", "B"),
780 )
781 .unwrap();
782 assert_eq!(
783 update.sql,
784 "UPDATE orders SET name = ?, version = ? WHERE id = ? AND version = ?"
785 );
786
787 let delete = SqliteDialect
788 .compile_delete(
789 &entity(),
790 &DeleteCommand::new("Order", 1_u64).expected_version(3),
791 )
792 .unwrap();
793 let recover = SqliteDialect
794 .compile_recover(&entity(), &RecoverCommand::new("Order", 1_u64, -4))
795 .unwrap();
796 assert_eq!(
797 delete.sql,
798 "UPDATE orders SET version = ? WHERE id = ? AND version = ?"
799 );
800 assert_eq!(
801 recover.sql,
802 "UPDATE orders SET version = ? WHERE id = ? AND version = ?"
803 );
804
805 let create = SqliteDialect.compile_create_table(&entity()).unwrap();
806 assert_eq!(
807 create,
808 "CREATE TABLE IF NOT EXISTS orders (id INTEGER PRIMARY KEY NOT NULL, version INTEGER NOT NULL, name TEXT)"
809 );
810 }
811
812 #[test]
813 fn sqlite_executor_ensures_schema_and_roundtrips_rows() {
814 let executor = SqliteMutationExecutor::from_connection(Connection::open_in_memory().unwrap());
815 let entity = entity();
816 let mut ctx = UserContext::new()
817 .with_metadata(InMemoryMetadataStore::new().with_entity(entity.clone()));
818
819 ctx.use_sqlite_provider(executor.clone());
820 ensure_sqlite_schema_for(&ctx).unwrap();
821
822 let insert = SqliteDialect
823 .compile_insert(
824 &entity,
825 &InsertCommand::new("Order")
826 .value("id", 1_u64)
827 .value("version", 1_i64)
828 .value("name", "draft"),
829 )
830 .unwrap();
831 assert_eq!(executor.execute(&insert).unwrap(), 1);
832
833 let select = SqliteDialect
834 .compile_select(
835 &entity,
836 &SelectQuery::new("Order")
837 .filter(Expr::eq("id", 1_u64))
838 .order_asc("id"),
839 )
840 .unwrap();
841 let rows = executor.fetch_all(&select).unwrap();
842 assert_eq!(rows.len(), 1);
843 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
844 assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
845 assert_eq!(rows[0].get("name"), Some(&Value::Text("draft".to_owned())));
846 }
847
848 #[test]
849 fn sqlite_executor_parses_json_only_for_json_columns() {
850 let executor = SqliteMutationExecutor::from_connection(Connection::open_in_memory().unwrap());
851
852 executor
853 .execute(&CompiledQuery {
854 sql: "CREATE TABLE payloads (text_payload TEXT, json_payload JSON)".to_owned(),
855 params: Vec::new(),
856 comment: None,
857 })
858 .unwrap();
859 executor
860 .execute(&CompiledQuery {
861 sql: "INSERT INTO payloads (text_payload, json_payload) VALUES (?, ?)".to_owned(),
862 params: vec![
863 Value::Text("{\"active\":true}".to_owned()),
864 Value::Json(serde_json::json!({"active": true})),
865 ],
866 comment: None,
867 })
868 .unwrap();
869
870 let rows = executor
871 .fetch_all(&CompiledQuery {
872 sql: "SELECT text_payload, json_payload FROM payloads".to_owned(),
873 params: Vec::new(),
874 comment: None,
875 })
876 .unwrap();
877
878 assert_eq!(
879 rows[0].get("text_payload"),
880 Some(&Value::Text("{\"active\":true}".to_owned()))
881 );
882 assert_eq!(
883 rows[0].get("json_payload"),
884 Some(&Value::Json(serde_json::json!({"active": true})))
885 );
886 }
887
888 #[test]
889 fn sqlite_id_space_generator_increments_ids() {
890 let executor = SqliteMutationExecutor::from_connection(Connection::open_in_memory().unwrap());
891 let generator = SqliteIdSpaceGenerator::from_executor(executor);
892 assert_eq!(generator.next_id("Order").unwrap(), 1);
893 assert_eq!(generator.next_id("Order").unwrap(), 2);
894 }
895
896 #[test]
897 fn sqlite_fetch_stream_returns_chunked_rows() {
898 let executor = SqliteMutationExecutor::new(Connection::open_in_memory().unwrap());
899 let entity = entity();
900
901 executor.execute(&CompiledQuery {
903 sql: "CREATE TABLE orders (id INTEGER PRIMARY KEY, version INTEGER, name TEXT)".to_owned(),
904 params: Vec::new(),
905 comment: None,
906 }).unwrap();
907
908 for i in 1..=25 {
909 let insert = SqliteDialect
910 .compile_insert(
911 &entity,
912 &InsertCommand::new("Order")
913 .value("id", i as u64)
914 .value("version", 1_i64)
915 .value("name", format!("order-{i}")),
916 )
917 .unwrap();
918 executor.execute(&insert).unwrap();
919 }
920
921 let query = SelectQuery::new("Order")
923 .filter(Expr::gt("version", 0_i64))
924 .order_asc("id")
925 .stream(10);
926
927 let compiled = SqliteDialect
928 .compile_select(&entity, &query)
929 .unwrap();
930
931 let chunks = executor.fetch_stream(&compiled, 10).unwrap();
932
933 assert_eq!(chunks.len(), 3);
935 assert_eq!(chunks[0].rows.len(), 10);
936 assert_eq!(chunks[0].chunk_index, 0);
937 assert!(!chunks[0].is_last);
938
939 assert_eq!(chunks[1].rows.len(), 10);
940 assert_eq!(chunks[1].chunk_index, 1);
941 assert!(!chunks[1].is_last);
942
943 assert_eq!(chunks[2].rows.len(), 5);
944 assert_eq!(chunks[2].chunk_index, 2);
945 assert!(chunks[2].is_last);
946
947 assert_eq!(chunks[0].rows[0].get("name"), Some(&Value::Text("order-1".to_owned())));
949 assert_eq!(chunks[2].rows[4].get("name"), Some(&Value::Text("order-25".to_owned())));
950 }
951
952 #[test]
953 fn sqlite_fetch_stream_handles_empty_result() {
954 let executor = SqliteMutationExecutor::new(Connection::open_in_memory().unwrap());
955
956 executor.execute(&CompiledQuery {
957 sql: "CREATE TABLE orders (id INTEGER PRIMARY KEY, version INTEGER, name TEXT)".to_owned(),
958 params: Vec::new(),
959 comment: None,
960 }).unwrap();
961
962 let entity = entity();
963 let query = SelectQuery::new("Order")
964 .filter(Expr::gt("version", 0_i64))
965 .stream(10);
966
967 let compiled = SqliteDialect
968 .compile_select(&entity, &query)
969 .unwrap();
970
971 let chunks = executor.fetch_stream(&compiled, 10).unwrap();
972
973 assert_eq!(chunks.len(), 1);
975 assert_eq!(chunks[0].rows.len(), 0);
976 assert!(chunks[0].is_last);
977 }
978
979 #[test]
980 fn sqlite_fetch_stream_exact_chunk_boundary() {
981 let executor = SqliteMutationExecutor::new(Connection::open_in_memory().unwrap());
982 let entity = entity();
983
984 executor.execute(&CompiledQuery {
985 sql: "CREATE TABLE orders (id INTEGER PRIMARY KEY, version INTEGER, name TEXT)".to_owned(),
986 params: Vec::new(),
987 comment: None,
988 }).unwrap();
989
990 for i in 1..=20 {
992 let insert = SqliteDialect
993 .compile_insert(
994 &entity,
995 &InsertCommand::new("Order")
996 .value("id", i as u64)
997 .value("version", 1_i64)
998 .value("name", format!("order-{i}")),
999 )
1000 .unwrap();
1001 executor.execute(&insert).unwrap();
1002 }
1003
1004 let query = SelectQuery::new("Order")
1005 .filter(Expr::gt("version", 0_i64))
1006 .order_asc("id")
1007 .stream(10);
1008
1009 let compiled = SqliteDialect
1010 .compile_select(&entity, &query)
1011 .unwrap();
1012
1013 let chunks = executor.fetch_stream(&compiled, 10).unwrap();
1014
1015 assert_eq!(chunks.len(), 3);
1017 assert_eq!(chunks[0].rows.len(), 10);
1018 assert!(!chunks[0].is_last);
1019 assert_eq!(chunks[1].rows.len(), 10);
1020 assert!(!chunks[1].is_last);
1021 assert_eq!(chunks[2].rows.len(), 0);
1022 assert!(chunks[2].is_last);
1023 }
1024}