1use std::collections::{BTreeMap, BTreeSet};
2use std::future::Future;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
8use rusqlite::types::{Value as SqliteValue, ValueRef};
9use rusqlite::{Connection, Row, params_from_iter};
10use rust_decimal::Decimal;
11use teaql_core::{
12 DataType, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record, SelectQuery,
13 UpdateCommand, Value,
14};
15use teaql_runtime::{
16 RawAuditEvent, GraphNode, InternalIdGenerator,
17 RuntimeError, SchemaProvider, UserContext,
18};
19use teaql_sql::{
20 CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect,
21 SqlTransport, quote_identifier_if_needed,
22};
23
24pub const DEFAULT_ID_SPACE_TABLE: &str = "teaql_id_space";
25
26const SQLITE_DECIMAL_PREFIX: &str = "__teaql_decimal__:";
27
28#[derive(Debug, Default, Clone, Copy)]
29pub struct SqliteDialect;
30
31impl SqlDialect for SqliteDialect {
32 fn kind(&self) -> DatabaseKind {
33 DatabaseKind::Sqlite
34 }
35
36 fn quote_ident(&self, ident: &str) -> String {
37 quote_ident(ident)
38 }
39
40 fn placeholder(&self, _index: usize) -> String {
41 "?".to_owned()
42 }
43
44 fn schema_type_sql(
45 &self,
46 data_type: DataType,
47 property: &PropertyDescriptor,
48 ) -> Result<&'static str, SqlCompileError> {
49 match data_type {
50 DataType::Bool => Ok("INTEGER"),
51 DataType::I64 | DataType::U64 if property.is_id => Ok("INTEGER"),
52 DataType::I64 | DataType::U64 => Ok("INTEGER"),
53 DataType::F64 => Ok("REAL"),
54 DataType::Decimal => Ok("NUMERIC"),
55 DataType::Text => Ok("TEXT"),
56 DataType::Json => Ok("JSON"),
57 DataType::Date => Ok("DATE"),
58 DataType::Timestamp => Ok("TIMESTAMP"),
59 }
60 }
61
62 fn compile_add_column(
63 &self,
64 entity: &EntityDescriptor,
65 property: &PropertyDescriptor,
66 ) -> Result<String, SqlCompileError> {
67 let def = self.column_definition_sql(property)?;
71 let def_without_not_null = def.replace(" NOT NULL", "");
72
73 Ok(format!(
74 "ALTER TABLE {} ADD COLUMN {}",
75 self.quote_ident(&entity.table_name),
76 def_without_not_null
77 ))
78 }
79}
80
81#[derive(Debug)]
82pub enum MutationExecutorError {
83 Sqlite(rusqlite::Error),
84 SqlCompile(SqlCompileError),
85 UnsupportedValue(&'static str),
86 UnsupportedColumnType(String),
87 Bind(String),
88 Lock(String),
89}
90
91impl std::fmt::Display for MutationExecutorError {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 match self {
94 Self::Sqlite(err) => err.fmt(f),
95 Self::SqlCompile(err) => err.fmt(f),
96 Self::UnsupportedValue(kind) => {
97 write!(
98 f,
99 "unsupported rusqlite bind value for mutation executor: {kind}"
100 )
101 }
102 Self::UnsupportedColumnType(kind) => {
103 write!(
104 f,
105 "unsupported rusqlite column type for record decoding: {kind}"
106 )
107 }
108 Self::Bind(message) => write!(f, "rusqlite bind error: {message}"),
109 Self::Lock(message) => write!(f, "rusqlite connection lock error: {message}"),
110 }
111 }
112}
113
114impl std::error::Error for MutationExecutorError {}
115
116impl From<rusqlite::Error> for MutationExecutorError {
117 fn from(value: rusqlite::Error) -> Self {
118 Self::Sqlite(value)
119 }
120}
121
122impl From<SqlCompileError> for MutationExecutorError {
123 fn from(value: SqlCompileError) -> Self {
124 Self::SqlCompile(value)
125 }
126}
127
128#[derive(Clone)]
129pub struct SqliteMutationExecutor {
130 connection: Arc<Mutex<Connection>>,
131}
132
133impl SqliteMutationExecutor {
134 pub fn new(connection: Connection) -> Self {
135 Self::from_shared_connection(Arc::new(Mutex::new(connection)))
136 }
137
138 pub fn from_shared_connection(connection: Arc<Mutex<Connection>>) -> Self {
139 Self { connection }
140 }
141
142 pub fn connection(&self) -> Arc<Mutex<Connection>> {
143 Arc::clone(&self.connection)
144 }
145
146 pub fn ensure_schema(
147 &self,
148 dialect: &SqliteDialect,
149 entities: &[&EntityDescriptor],
150 ) -> Result<(), MutationExecutorError> {
151 self.ensure_id_space_table(DEFAULT_ID_SPACE_TABLE)?;
152
153 for entity in entities {
154 if !self.table_exists(&entity.table_name)? {
155 let sql = dialect.compile_create_table(entity)?;
156 self.lock()?.execute(&sql, [])?;
157 continue;
158 }
159
160 let existing_columns = self.table_columns(&entity.table_name)?;
161 for property in &entity.properties {
162 let bare_column = strip_identifier_quotes(&property.column_name).to_lowercase();
163 if existing_columns.contains(&bare_column) {
164 continue;
165 }
166 let sql = dialect.compile_add_column(entity, property)?;
167 self.lock()?.execute(&sql, [])?;
168 }
169 }
170 Ok(())
171 }
172
173 pub fn ensure_id_space_table(&self, table_name: &str) -> Result<(), MutationExecutorError> {
174 let sql = format!(
175 "CREATE TABLE IF NOT EXISTS {} (type_name VARCHAR(100) PRIMARY KEY, current_level BIGINT NOT NULL)",
176 quote_ident(table_name)
177 );
178 self.lock()?.execute(&sql, [])?;
179 Ok(())
180 }
181
182 pub fn begin_transaction(&self) -> Result<(), MutationExecutorError> {
183 self.lock()?.execute("BEGIN IMMEDIATE", [])?;
184 Ok(())
185 }
186
187 pub fn commit_transaction(&self) -> Result<(), MutationExecutorError> {
188 self.lock()?.execute("COMMIT", [])?;
189 Ok(())
190 }
191
192 pub fn rollback_transaction(&self) -> Result<(), MutationExecutorError> {
193 self.lock()?.execute("ROLLBACK", [])?;
194 Ok(())
195 }
196
197 pub fn execute(&self, query: &CompiledQuery) -> Result<u64, MutationExecutorError> {
198 let params = bind_values(&query.params)?;
199 let rows = self
200 .lock()?
201 .execute(&query.sql_with_comment(), params_from_iter(params.iter()))?;
202 Ok(rows as u64)
203 }
204
205 pub fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, MutationExecutorError> {
206 let params = bind_values(&query.params)?;
207 let connection = self.lock()?;
208 let mut statement = connection.prepare(&query.sql_with_comment())?;
209 let columns = statement_columns(&statement);
210 let mut rows = statement.query(params_from_iter(params.iter()))?;
211 let mut records = Vec::new();
212 while let Some(row) = rows.next()? {
213 records.push(decode_sqlite_row(row, &columns)?);
214 }
215 Ok(records)
216 }
217
218 pub fn table_exists(&self, table_name: &str) -> Result<bool, MutationExecutorError> {
219 let exists: i64 = self.lock()?.query_row(
220 "SELECT COUNT(1) FROM sqlite_master WHERE type = 'table' AND name = ?",
221 [table_name],
222 |row| row.get(0),
223 )?;
224 Ok(exists > 0)
225 }
226
227 pub fn table_columns(&self, table_name: &str) -> Result<BTreeSet<String>, MutationExecutorError> {
228 let pragma_sql = format!("PRAGMA table_info({})", quote_ident(table_name));
229 let connection = self.lock()?;
230 let mut statement = connection.prepare(&pragma_sql)?;
231 let rows = statement.query_map([], |row| row.get::<_, String>("name"))?;
232 let mut columns = BTreeSet::new();
233 for row in rows {
234 columns.insert(row?.to_lowercase());
235 }
236 Ok(columns)
237 }
238
239 fn lock(&self) -> Result<MutexGuard<'_, Connection>, MutationExecutorError> {
240 self.connection
241 .lock()
242 .map_err(|err| MutationExecutorError::Lock(err.to_string()))
243 }
244}
245
246
247impl SqlTransport for SqliteMutationExecutor {
248 type Error = MutationExecutorError;
249
250 async fn fetch_all_sql(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
251 SqliteMutationExecutor::fetch_all(self, query)
252 }
253
254 async fn execute_sql(&self, query: &CompiledQuery) -> Result<u64, Self::Error> {
255 SqliteMutationExecutor::execute(self, query)
256 }
257}
258
259impl teaql_sql::SqlTransaction for SqliteMutationExecutor {
260 type Error = MutationExecutorError;
261
262 async fn commit_sql(self) -> Result<(), Self::Error> {
263 self.commit_transaction()
264 }
265
266 async fn rollback_sql(self) -> Result<(), Self::Error> {
267 self.rollback_transaction()
268 }
269}
270
271impl teaql_sql::SqlTransactionTransport for SqliteMutationExecutor {
272 type Tx<'a> = Self where Self: 'a;
273
274 async fn begin_sql(&self) -> Result<Self::Tx<'_>, Self::Error> {
275 self.begin_transaction()?;
276 Ok(self.clone())
277 }
278}
279
280
281
282fn initial_graph_exists_sqlite(
283 executor: &SqliteMutationExecutor,
284 dialect: &SqliteDialect,
285 entity: &EntityDescriptor,
286 graph: &GraphNode,
287) -> Result<bool, MutationExecutorError> {
288 let Some(id) = graph.values.get("id") else {
289 return Ok(false);
290 };
291 let query = dialect.compile_select(
292 entity,
293 &SelectQuery::new(&graph.entity)
294 .project("id")
295 .filter(Expr::eq("id", id.clone()))
296 .limit(1),
297 )?;
298 Ok(!executor.fetch_all(&query)?.is_empty())
299}
300
301fn compile_initial_graph_insert(
302 dialect: &impl SqlDialect,
303 entity: &EntityDescriptor,
304 graph: &GraphNode,
305) -> Result<CompiledQuery, MutationExecutorError> {
306 let mut command = InsertCommand::new(&graph.entity);
307 for (field, value) in &graph.values {
308 command = command.value(field.clone(), value.clone());
309 }
310 dialect.compile_insert(entity, &command).map_err(Into::into)
311}
312
313fn compile_initial_graph_update(
314 dialect: &impl SqlDialect,
315 entity: &EntityDescriptor,
316 graph: &GraphNode,
317) -> Result<Option<CompiledQuery>, MutationExecutorError> {
318 let Some(id) = graph.values.get("id") else {
319 return Ok(None);
320 };
321 let mut command = UpdateCommand::new(&graph.entity, id.clone());
322 for (field, value) in &graph.values {
323 if field == "id" {
324 continue;
325 }
326 command = command.value(field.clone(), value.clone());
327 }
328 match dialect.compile_update(entity, &command) {
329 Ok(query) => Ok(Some(query)),
330 Err(SqlCompileError::EmptyMutation(_)) => Ok(None),
331 Err(err) => Err(err.into()),
332 }
333}
334
335pub trait SqliteSchemaExt {
336 fn ensure_sqlite_schema(
337 &self,
338 ) -> Pin<Box<dyn Future<Output = Result<(), MutationExecutorError>> + Send + '_>>;
339}
340
341pub fn ensure_sqlite_schema_for(ctx: &UserContext) -> Result<(), MutationExecutorError> {
342 let dialect = ctx.get_resource::<SqliteDialect>().ok_or_else(|| {
343 MutationExecutorError::Bind("missing typed resource: SqliteDialect".to_owned())
344 })?;
345 let executor = ctx
346 .get_resource::<SqliteMutationExecutor>()
347 .ok_or_else(|| {
348 MutationExecutorError::Bind(
349 "missing typed resource: SqliteMutationExecutor".to_owned(),
350 )
351 })?;
352
353 let entities = ctx.all_entities();
354
355 executor.ensure_id_space_table(DEFAULT_ID_SPACE_TABLE)?;
357
358 for entity in &entities {
360 let field_count = entity.properties.len();
361 if !executor.table_exists(&entity.table_name)? {
362 let sql = dialect.compile_create_table(entity)?;
364 executor.lock()?.execute(&sql, [])?;
365 let _ = ctx.send_event(RawAuditEvent::schema_created(
366 &entity.name,
367 &entity.table_name,
368 field_count,
369 ));
370 } else {
371 let existing_columns = executor.table_columns(&entity.table_name)?;
373 let mut fields_added = 0;
374 for property in &entity.properties {
375 let bare_column = strip_identifier_quotes(&property.column_name).to_lowercase();
376 if existing_columns.contains(&bare_column) {
377 continue;
378 }
379 let sql = dialect.compile_add_column(entity, property)?;
380 executor.lock()?.execute(&sql, [])?;
381 let _ = ctx.send_event(RawAuditEvent::field_added(
382 &entity.name,
383 &entity.table_name,
384 &property.column_name,
385 ));
386 fields_added += 1;
387 }
388 let _ = ctx.send_event(RawAuditEvent::schema_verified(
389 &entity.name,
390 &entity.table_name,
391 field_count,
392 ));
393 let _ = fields_added; }
395 }
396
397 let mut seed_counts: BTreeMap<String, (usize, usize)> = BTreeMap::new(); for graph in ctx.initial_graphs() {
400 let entity = ctx.entity(&graph.entity).ok_or_else(|| {
401 MutationExecutorError::Bind(format!("missing entity: {}", graph.entity))
402 })?;
403 let counts = seed_counts.entry(graph.entity.clone()).or_insert((0, 0));
404 if initial_graph_exists_sqlite(executor, dialect, entity, graph)? {
405 if let Some(query) = compile_initial_graph_update(dialect, entity, graph)? {
406 executor.execute(&query)?;
407 }
408 counts.1 += 1; } else {
410 let query = compile_initial_graph_insert(dialect, entity, graph)?;
411 executor.execute(&query)?;
412 counts.0 += 1; }
414 }
415
416 for (entity_name, (inserted, updated)) in &seed_counts {
418 let entity = ctx.entity(entity_name).ok_or_else(|| {
419 MutationExecutorError::Bind(format!("missing entity: {}", entity_name))
420 })?;
421 let _ = ctx.send_event(RawAuditEvent::data_seeded(
422 entity_name,
423 &entity.table_name,
424 *inserted,
425 *updated,
426 ));
427 }
428
429 Ok(())
430}
431
432impl SqliteSchemaExt for UserContext {
433 fn ensure_sqlite_schema(
434 &self,
435 ) -> Pin<Box<dyn Future<Output = Result<(), MutationExecutorError>> + Send + '_>> {
436 Box::pin(async move { ensure_sqlite_schema_for(self) })
437 }
438}
439
440#[derive(Debug, Default, Clone, Copy)]
441pub struct SqliteSchemaProvider;
442
443impl SchemaProvider for SqliteSchemaProvider {
444 fn ensure_schema<'a>(
445 &'a self,
446 ctx: &'a UserContext,
447 ) -> Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>> {
448 Box::pin(async move {
449 ensure_sqlite_schema_for(ctx).map_err(|err| RuntimeError::Schema(err.to_string()))
450 })
451 }
452}
453
454pub trait SqliteProviderExt {
455 fn use_sqlite_provider(&mut self, executor: SqliteMutationExecutor) -> &mut Self;
456}
457
458impl SqliteProviderExt for UserContext {
459 fn use_sqlite_provider(&mut self, executor: SqliteMutationExecutor) -> &mut Self {
460 self.insert_resource(SqliteDialect);
461 self.insert_resource(executor);
462 self.set_schema_provider(SqliteSchemaProvider);
463 self
464 }
465}
466
467#[derive(Clone)]
468pub struct SqliteIdSpaceGenerator {
469 executor: SqliteMutationExecutor,
470 table_name: String,
471}
472
473impl SqliteIdSpaceGenerator {
474 pub fn new(connection: Connection) -> Self {
475 Self::from_executor(SqliteMutationExecutor::new(connection))
476 }
477
478 pub fn from_executor(executor: SqliteMutationExecutor) -> Self {
479 Self {
480 executor,
481 table_name: DEFAULT_ID_SPACE_TABLE.to_owned(),
482 }
483 }
484
485 pub fn with_table_name(mut self, table_name: impl Into<String>) -> Self {
486 self.table_name = table_name.into();
487 self
488 }
489
490 pub fn ensure_table(&self) -> Result<(), MutationExecutorError> {
491 self.executor.ensure_id_space_table(&self.table_name)
492 }
493
494 pub fn next_id(&self, entity: &str) -> Result<u64, MutationExecutorError> {
495 self.ensure_table()?;
496 let sql = format!(
497 "INSERT INTO {} (type_name, current_level) VALUES (?, 1) \
498 ON CONFLICT (type_name) DO UPDATE \
499 SET current_level = current_level + 1 \
500 RETURNING current_level",
501 quote_ident(&self.table_name)
502 );
503 let id: i64 = self
504 .executor
505 .lock()?
506 .query_row(&sql, [entity], |row| row.get(0))?;
507 u64::try_from(id).map_err(|_| {
508 MutationExecutorError::Bind(format!("generated id {id} cannot be represented as u64"))
509 })
510 }
511}
512
513impl InternalIdGenerator for SqliteIdSpaceGenerator {
514 fn generate_id(&self, entity: &str) -> Result<u64, RuntimeError> {
515 self.next_id(entity)
516 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))
517 }
518}
519
520fn quote_ident(ident: &str) -> String {
521 quote_identifier_if_needed(ident, '"')
522}
523
524fn strip_identifier_quotes(ident: &str) -> &str {
532 let bytes = ident.as_bytes();
533 if bytes.len() >= 2 {
534 let (first, last) = (bytes[0], bytes[bytes.len() - 1]);
535 if (first == b'"' && last == b'"')
536 || (first == b'`' && last == b'`')
537 || (first == b'[' && last == b']')
538 {
539 return &ident[1..ident.len() - 1];
540 }
541 }
542 ident
543}
544
545fn bind_values(values: &[Value]) -> Result<Vec<SqliteValue>, MutationExecutorError> {
546 values.iter().map(bind_sqlite_value).collect()
547}
548
549fn bind_sqlite_value(value: &Value) -> Result<SqliteValue, MutationExecutorError> {
550 match value {
551 Value::Null => Ok(SqliteValue::Null),
552 Value::Bool(v) => Ok(SqliteValue::Integer(i64::from(*v))),
553 Value::I64(v) => Ok(SqliteValue::Integer(*v)),
554 Value::U64(v) => i64::try_from(*v)
555 .map(SqliteValue::Integer)
556 .map_err(|_| MutationExecutorError::Bind(format!("u64 value {v} exceeds i64 range"))),
557 Value::F64(v) => Ok(SqliteValue::Real(*v)),
558 Value::Decimal(v) => Ok(SqliteValue::Text(format!("{SQLITE_DECIMAL_PREFIX}{v}"))),
559 Value::Text(v) => Ok(SqliteValue::Text(v.clone())),
560 Value::Json(v) => Ok(SqliteValue::Text(v.to_string())),
561 Value::Date(v) => Ok(SqliteValue::Text(v.format("%Y-%m-%d").to_string())),
562 Value::Timestamp(v) => Ok(SqliteValue::Text(v.to_rfc3339())),
563 Value::Object(_) => Err(MutationExecutorError::UnsupportedValue("object")),
564 Value::List(_) => Err(MutationExecutorError::UnsupportedValue("list")),
565 }
566}
567
568#[derive(Debug, Clone)]
569struct ColumnInfo {
570 name: String,
571 decl_type: Option<String>,
572}
573
574fn statement_columns(statement: &rusqlite::Statement<'_>) -> Vec<ColumnInfo> {
575 statement
576 .columns()
577 .into_iter()
578 .map(|column| ColumnInfo {
579 name: column.name().to_owned(),
580 decl_type: column.decl_type().map(|value| value.to_ascii_uppercase()),
581 })
582 .collect()
583}
584
585fn decode_sqlite_row(
586 row: &Row<'_>,
587 columns: &[ColumnInfo],
588) -> Result<Record, MutationExecutorError> {
589 let mut record = BTreeMap::new();
590 for (index, column) in columns.iter().enumerate() {
591 let value_ref = row.get_ref(index)?;
592 let value = match value_ref {
593 ValueRef::Null => Value::Null,
594 ValueRef::Integer(value) => decode_sqlite_integer(value, column),
595 ValueRef::Real(value) => Value::F64(value),
596 ValueRef::Text(value) => decode_sqlite_text(value, column)?,
597 ValueRef::Blob(_) => {
598 return Err(MutationExecutorError::UnsupportedColumnType(
599 "BLOB".to_owned(),
600 ));
601 }
602 };
603 record.insert(column.name.clone(), value);
604 }
605 Ok(record)
606}
607
608fn decode_sqlite_integer(value: i64, column: &ColumnInfo) -> Value {
609 match column_decl_type(column).as_deref() {
610 Some("BOOLEAN") | Some("BOOL") => Value::Bool(value != 0),
611 _ => Value::I64(value),
612 }
613}
614
615fn decode_sqlite_text(value: &[u8], column: &ColumnInfo) -> Result<Value, MutationExecutorError> {
616 let value = std::str::from_utf8(value)
617 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite text: {err}")))?;
618 if let Some(decimal) = value.strip_prefix(SQLITE_DECIMAL_PREFIX) {
619 return Decimal::from_str(decimal)
620 .map(Value::Decimal)
621 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite decimal: {err}")));
622 }
623
624 match column_decl_type(column).as_deref() {
625 Some("NUMERIC") | Some("DECIMAL") => Decimal::from_str(value)
626 .map(Value::Decimal)
627 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite decimal: {err}"))),
628 Some("JSON") => serde_json::from_str(value).map(Value::Json).map_err(|err| {
629 MutationExecutorError::Bind(format!("invalid sqlite json value: {err}"))
630 }),
631 Some("DATE") => NaiveDate::parse_from_str(value, "%Y-%m-%d")
632 .map(Value::Date)
633 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite date: {err}"))),
634 Some("TIMESTAMP") | Some("DATETIME") => parse_sqlite_timestamp(value),
635 _ => infer_sqlite_text(value),
636 }
637}
638
639fn infer_sqlite_text(value: &str) -> Result<Value, MutationExecutorError> {
640 if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
641 return Ok(Value::Date(date));
642 }
643 if let Ok(timestamp) = DateTime::parse_from_rfc3339(value) {
644 return Ok(Value::Timestamp(timestamp.with_timezone(&Utc)));
645 }
646 if let Ok(timestamp) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
647 return Ok(Value::Timestamp(Utc.from_utc_datetime(×tamp)));
648 }
649 Ok(Value::Text(value.to_owned()))
650}
651
652fn parse_sqlite_timestamp(value: &str) -> Result<Value, MutationExecutorError> {
653 if let Ok(timestamp) = DateTime::parse_from_rfc3339(value) {
654 return Ok(Value::Timestamp(timestamp.with_timezone(&Utc)));
655 }
656 NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S")
657 .map(|timestamp| Value::Timestamp(Utc.from_utc_datetime(×tamp)))
658 .map_err(|err| MutationExecutorError::Bind(format!("invalid sqlite timestamp: {err}")))
659}
660
661fn column_decl_type(column: &ColumnInfo) -> Option<String> {
662 column
663 .decl_type
664 .as_ref()
665 .map(|value| value.split('(').next().unwrap_or(value).trim().to_owned())
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671 use teaql_core::{DeleteCommand, RecoverCommand};
672 use teaql_runtime::InMemoryMetadataStore;
673
674 fn entity() -> EntityDescriptor {
675 EntityDescriptor::new("Order")
676 .table_name("orders")
677 .property(
678 PropertyDescriptor::new("id", DataType::U64)
679 .column_name("id")
680 .id()
681 .not_null(),
682 )
683 .property(
684 PropertyDescriptor::new("version", DataType::I64)
685 .column_name("version")
686 .version()
687 .not_null(),
688 )
689 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
690 }
691
692 #[test]
693 fn sqlite_dialect_compiles_mutations_and_schema() {
694 let insert = SqliteDialect
695 .compile_insert(
696 &entity(),
697 &InsertCommand::new("Order")
698 .value("id", 1_u64)
699 .value("name", "A"),
700 )
701 .unwrap();
702 assert_eq!(insert.sql, "INSERT INTO orders (id, name) VALUES (?, ?)");
703
704 let update = SqliteDialect
705 .compile_update(
706 &entity(),
707 &UpdateCommand::new("Order", 1_u64)
708 .expected_version(3)
709 .value("name", "B"),
710 )
711 .unwrap();
712 assert_eq!(
713 update.sql,
714 "UPDATE orders SET name = ?, version = ? WHERE id = ? AND version = ?"
715 );
716
717 let delete = SqliteDialect
718 .compile_delete(
719 &entity(),
720 &DeleteCommand::new("Order", 1_u64).expected_version(3),
721 )
722 .unwrap();
723 let recover = SqliteDialect
724 .compile_recover(&entity(), &RecoverCommand::new("Order", 1_u64, -4))
725 .unwrap();
726 assert_eq!(
727 delete.sql,
728 "UPDATE orders SET version = ? WHERE id = ? AND version = ?"
729 );
730 assert_eq!(
731 recover.sql,
732 "UPDATE orders SET version = ? WHERE id = ? AND version = ?"
733 );
734
735 let create = SqliteDialect.compile_create_table(&entity()).unwrap();
736 assert_eq!(
737 create,
738 "CREATE TABLE IF NOT EXISTS orders (id INTEGER PRIMARY KEY NOT NULL, version INTEGER NOT NULL, name TEXT)"
739 );
740 }
741
742 #[test]
743 fn sqlite_executor_ensures_schema_and_roundtrips_rows() {
744 let executor = SqliteMutationExecutor::new(Connection::open_in_memory().unwrap());
745 let entity = entity();
746 let mut ctx = UserContext::new()
747 .with_metadata(InMemoryMetadataStore::new().with_entity(entity.clone()));
748
749 ctx.use_sqlite_provider(executor.clone());
750 ensure_sqlite_schema_for(&ctx).unwrap();
751
752 let insert = SqliteDialect
753 .compile_insert(
754 &entity,
755 &InsertCommand::new("Order")
756 .value("id", 1_u64)
757 .value("version", 1_i64)
758 .value("name", "draft"),
759 )
760 .unwrap();
761 assert_eq!(executor.execute(&insert).unwrap(), 1);
762
763 let select = SqliteDialect
764 .compile_select(
765 &entity,
766 &SelectQuery::new("Order")
767 .filter(Expr::eq("id", 1_u64))
768 .order_asc("id"),
769 )
770 .unwrap();
771 let rows = executor.fetch_all(&select).unwrap();
772 assert_eq!(rows.len(), 1);
773 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
774 assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
775 assert_eq!(rows[0].get("name"), Some(&Value::Text("draft".to_owned())));
776 }
777
778 #[test]
779 fn sqlite_executor_parses_json_only_for_json_columns() {
780 let executor = SqliteMutationExecutor::new(Connection::open_in_memory().unwrap());
781
782 executor
783 .execute(&CompiledQuery {
784 sql: "CREATE TABLE payloads (text_payload TEXT, json_payload JSON)".to_owned(),
785 params: Vec::new(),
786 comment: None,
787 })
788 .unwrap();
789 executor
790 .execute(&CompiledQuery {
791 sql: "INSERT INTO payloads (text_payload, json_payload) VALUES (?, ?)".to_owned(),
792 params: vec![
793 Value::Text("{\"active\":true}".to_owned()),
794 Value::Json(serde_json::json!({"active": true})),
795 ],
796 comment: None,
797 })
798 .unwrap();
799
800 let rows = executor
801 .fetch_all(&CompiledQuery {
802 sql: "SELECT text_payload, json_payload FROM payloads".to_owned(),
803 params: Vec::new(),
804 comment: None,
805 })
806 .unwrap();
807
808 assert_eq!(
809 rows[0].get("text_payload"),
810 Some(&Value::Text("{\"active\":true}".to_owned()))
811 );
812 assert_eq!(
813 rows[0].get("json_payload"),
814 Some(&Value::Json(serde_json::json!({"active": true})))
815 );
816 }
817
818 #[test]
819 fn sqlite_id_space_generator_increments_ids() {
820 let executor = SqliteMutationExecutor::new(Connection::open_in_memory().unwrap());
821 let generator = SqliteIdSpaceGenerator::from_executor(executor);
822 assert_eq!(generator.next_id("Order").unwrap(), 1);
823 assert_eq!(generator.next_id("Order").unwrap(), 2);
824 }
825}