1use crate::{
28 Aggregate, Database, Error, FilterOperator, PaginatedResult, Pagination, QueryBuilder, Result,
29 SearchFilter, Sort,
30};
31use std::collections::HashMap;
32
33use serde::{de::DeserializeOwned, Serialize};
34
35fn mask_id(id: i64) -> String {
37 if id < 100 {
38 return "*".repeat(id.to_string().len());
39 }
40 let id_str = id.to_string();
41 let visible_digits = 2;
42 let masked_digits = id_str.len() - visible_digits;
43 format!("{}{}", &id_str[..visible_digits], "*".repeat(masked_digits))
44}
45
46#[allow(async_fn_in_trait)]
48pub trait Model: Serialize + DeserializeOwned + Send + Sync + Clone {
49 fn table_name() -> &'static str;
51
52 fn primary_key() -> &'static str {
54 "id"
55 }
56
57 fn get_primary_key(&self) -> Option<i64>;
59
60 fn set_primary_key(&mut self, id: i64);
62
63 fn columns() -> Vec<&'static str>;
65
66 fn migration_sql() -> String;
68
69 fn to_map(&self) -> Result<HashMap<String, crate::Value>>;
71
72 fn from_map(map: HashMap<String, crate::Value>) -> Result<Self>;
74
75 async fn create(&self, db: &Database) -> Result<Self> {
77 let map = self.to_map()?;
78 let columns: Vec<String> = map.keys().cloned().collect();
79 let values: Vec<String> = map.keys().map(|_| "?".to_string()).collect();
80
81 let sql = format!(
82 "INSERT INTO {} ({}) VALUES ({})",
83 Self::table_name(),
84 columns.join(", "),
85 values.join(", ")
86 );
87
88 Self::log_info(&format!("Creating record in table: {}", Self::table_name()));
89 Self::log_debug(&format!("SQL: {sql}"));
90
91 let params: Vec<libsql::Value> = map
92 .values()
93 .map(|v| Self::value_to_libsql_value(v))
94 .collect();
95
96 db.inner.execute(&sql, params).await?;
97 let id = 1i64; let mut result = self.clone();
100 result.set_primary_key(id);
101
102 Self::log_info(&format!(
103 "Successfully created record with ID: {}",
104 mask_id(id)
105 ));
106 Ok(result)
107 }
108
109 async fn create_or_update(&self, db: &Database) -> Result<Self> {
111 if let Some(id) = self.get_primary_key() {
112 Self::log_info(&format!(
113 "Updating existing record with ID: {}",
114 mask_id(id)
115 ));
116 match Self::find_by_id(id, db).await? {
118 Some(_) => {
119 self.update(db).await
121 }
122 None => {
123 Self::log_warn(&format!(
125 "Record with ID {} not found, creating new record",
126 mask_id(id)
127 ));
128 self.create(db).await
129 }
130 }
131 } else {
132 Self::log_info("Creating new record (no primary key provided)");
134 self.create(db).await
135 }
136 }
137
138 async fn upsert(&self, unique_columns: &[&str], db: &Database) -> Result<Self> {
140 let map = self.to_map()?;
141
142 let mut where_conditions = Vec::new();
144 let mut where_params = Vec::new();
145
146 for &column in unique_columns {
147 if let Some(value) = map.get(column) {
148 where_conditions.push(format!("{column} = ?"));
149 where_params.push(Self::value_to_libsql_value(value));
150 }
151 }
152
153 if where_conditions.is_empty() {
154 return Err(Error::Validation(
155 "No unique columns provided for upsert".to_string(),
156 ));
157 }
158
159 let where_clause = where_conditions.join(" AND ");
160 let sql = format!(
161 "SELECT {} FROM {} WHERE {}",
162 Self::primary_key(),
163 Self::table_name(),
164 where_clause
165 );
166
167 Self::log_info(&format!(
168 "Checking for existing record in table: {}",
169 Self::table_name()
170 ));
171 Self::log_debug(&format!("SQL: {sql}"));
172
173 let mut rows = db.inner.query(&sql, where_params).await?;
174
175 if let Some(row) = rows.next().await? {
176 if let Some(existing_id) = row.get_value(0).ok().and_then(|v| match v {
178 libsql::Value::Integer(i) => Some(i),
179 _ => None,
180 }) {
181 Self::log_info(&format!(
182 "Found existing record with ID: {}, updating",
183 mask_id(existing_id)
184 ));
185 let mut updated_self = self.clone();
186 updated_self.set_primary_key(existing_id);
187 updated_self.update(db).await
188 } else {
189 Err(Error::Query(
190 "Failed to get primary key from existing record".to_string(),
191 ))
192 }
193 } else {
194 Self::log_info("No existing record found, creating new one");
196 self.create(db).await
197 }
198 }
199
200 async fn bulk_create(models: &[Self], db: &Database) -> Result<Vec<Self>> {
202 if models.is_empty() {
203 return Ok(Vec::new());
204 }
205
206 let mut results = Vec::new();
207 db.inner
209 .execute("BEGIN", vec![libsql::Value::Null; 0])
210 .await?;
211
212 for model in models {
213 let map = model.to_map()?;
214 let columns: Vec<String> = map.keys().cloned().collect();
215 let values: Vec<String> = map.keys().map(|_| "?".to_string()).collect();
216
217 let sql = format!(
218 "INSERT INTO {} ({}) VALUES ({})",
219 Self::table_name(),
220 columns.join(", "),
221 values.join(", ")
222 );
223
224 let params: Vec<libsql::Value> = map
225 .values()
226 .map(|v| Self::value_to_libsql_value(v))
227 .collect();
228
229 db.inner.execute(&sql, params).await?;
230 let id = 1i64; let mut result = model.clone();
233 result.set_primary_key(id);
234 results.push(result);
235 }
236
237 db.inner
238 .execute("COMMIT", vec![libsql::Value::Null; 0])
239 .await?;
240 Ok(results)
241 }
242
243 async fn find_by_id(id: i64, db: &Database) -> Result<Option<Self>> {
245 let sql = format!(
246 "SELECT * FROM {} WHERE {} = ?",
247 Self::table_name(),
248 Self::primary_key()
249 );
250
251 Self::log_debug(&format!("Finding record by ID: {}", mask_id(id)));
252 Self::log_debug(&format!("SQL: {sql}"));
253
254 let mut rows = db
255 .inner
256 .query(&sql, vec![libsql::Value::Integer(id)])
257 .await?;
258
259 if let Some(row) = rows.next().await? {
260 let map = Self::row_to_map(&row)?;
261 Self::log_debug(&format!("Found record with ID: {}", mask_id(id)));
262 Ok(Some(Self::from_map(map)?))
263 } else {
264 Self::log_debug(&format!("No record found with ID: {}", mask_id(id)));
265 Ok(None)
266 }
267 }
268
269 async fn find_one(filter: FilterOperator, db: &Database) -> Result<Option<Self>> {
271 let builder = QueryBuilder::new(Self::table_name())
272 .r#where(filter)
273 .limit(1);
274
275 let results = builder.execute::<Self>(db).await?;
276 Ok(results.into_iter().next())
277 }
278
279 async fn find_all(db: &Database) -> Result<Vec<Self>> {
281 let builder = QueryBuilder::new(Self::table_name());
282 builder.execute::<Self>(db).await
283 }
284
285 async fn find_where(filter: FilterOperator, db: &Database) -> Result<Vec<Self>> {
287 let builder = QueryBuilder::new(Self::table_name()).r#where(filter);
288 builder.execute::<Self>(db).await
289 }
290
291 async fn find_paginated(
293 pagination: &Pagination,
294 db: &Database,
295 ) -> Result<PaginatedResult<Self>> {
296 let builder = QueryBuilder::new(Self::table_name());
297 builder.execute_paginated::<Self>(db, pagination).await
298 }
299
300 async fn find_where_paginated(
302 filter: FilterOperator,
303 pagination: &Pagination,
304 db: &Database,
305 ) -> Result<PaginatedResult<Self>> {
306 let builder = QueryBuilder::new(Self::table_name()).r#where(filter);
307 builder.execute_paginated::<Self>(db, pagination).await
308 }
309
310 async fn search(
312 search_filter: &SearchFilter,
313 pagination: Option<&Pagination>,
314 db: &Database,
315 ) -> Result<PaginatedResult<Self>> {
316 let filter = search_filter.to_filter_operator();
317 let pagination = pagination.unwrap_or(&Pagination::default()).clone();
318
319 Self::find_where_paginated(filter, &pagination, db).await
320 }
321
322 async fn count(db: &Database) -> Result<u64> {
324 let sql = format!("SELECT COUNT(*) FROM {}", Self::table_name());
325 let mut rows = db.inner.query(&sql, vec![libsql::Value::Null; 0]).await?;
326
327 if let Some(row) = rows.next().await? {
328 row.get_value(0)
329 .ok()
330 .and_then(|v| match v {
331 libsql::Value::Integer(i) => Some(i as u64),
332 _ => None,
333 })
334 .ok_or_else(|| Error::Query("Failed to get count".to_string()))
335 } else {
336 Err(Error::Query("No count result".to_string()))
337 }
338 }
339
340 async fn count_where(filter: FilterOperator, db: &Database) -> Result<u64> {
342 let builder = QueryBuilder::new(Self::table_name()).r#where(filter);
343
344 let (sql, params) = builder.build_count()?;
345 let mut rows = db.inner.query(&sql, params).await?;
346
347 if let Some(row) = rows.next().await? {
348 row.get_value(0)
349 .ok()
350 .and_then(|v| match v {
351 libsql::Value::Integer(i) => Some(i as u64),
352 _ => None,
353 })
354 .ok_or_else(|| Error::Query("Failed to get count".to_string()))
355 } else {
356 Err(Error::Query("No count result".to_string()))
357 }
358 }
359
360 async fn update(&self, db: &Database) -> Result<Self> {
362 let id = self.get_primary_key().ok_or_else(|| {
363 Error::Validation("Cannot update record without primary key".to_string())
364 })?;
365
366 let map = self.to_map()?;
367 let set_clauses: Vec<String> = map
368 .keys()
369 .filter(|&k| k != Self::primary_key())
370 .map(|k| format!("{k} = ?"))
371 .collect();
372
373 let sql = format!(
374 "UPDATE {} SET {} WHERE {} = ?",
375 Self::table_name(),
376 set_clauses.join(", "),
377 Self::primary_key()
378 );
379
380 Self::log_info(&format!("Updating record with ID: {}", mask_id(id)));
381 Self::log_debug(&format!("SQL: {sql}"));
382
383 let mut params: Vec<libsql::Value> = map
384 .iter()
385 .filter(|(k, _)| k != &Self::primary_key())
386 .map(|(_, v)| Self::value_to_libsql_value(v))
387 .collect();
388 params.push(libsql::Value::Integer(id));
389
390 db.inner.execute(&sql, params).await?;
391 Self::log_info(&format!(
392 "Successfully updated record with ID: {}",
393 mask_id(id)
394 ));
395 Ok(self.clone())
396 }
397
398 async fn bulk_update(models: &[Self], db: &Database) -> Result<Vec<Self>> {
400 if models.is_empty() {
401 return Ok(Vec::new());
402 }
403
404 let mut results = Vec::new();
405 db.inner
407 .execute("BEGIN", vec![libsql::Value::Null; 0])
408 .await?;
409
410 for model in models {
411 let result = model.update(db).await?;
412 results.push(result);
413 }
414
415 db.inner
416 .execute("COMMIT", vec![libsql::Value::Null; 0])
417 .await?;
418 Ok(results)
419 }
420
421 async fn delete(&self, db: &Database) -> Result<bool> {
423 let id = self.get_primary_key().ok_or_else(|| {
424 Error::Validation("Cannot delete record without primary key".to_string())
425 })?;
426
427 let sql = format!(
428 "DELETE FROM {} WHERE {} = ?",
429 Self::table_name(),
430 Self::primary_key()
431 );
432
433 Self::log_info(&format!("Deleting record with ID: {}", mask_id(id)));
434 Self::log_debug(&format!("SQL: {sql}"));
435
436 db.inner
437 .execute(&sql, vec![libsql::Value::Integer(id)])
438 .await?;
439 Self::log_info(&format!(
440 "Successfully deleted record with ID: {}",
441 mask_id(id)
442 ));
443 Ok(true)
444 }
445
446 async fn bulk_delete(ids: &[i64], db: &Database) -> Result<u64> {
448 if ids.is_empty() {
449 return Ok(0);
450 }
451
452 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
453 let sql = format!(
454 "DELETE FROM {} WHERE {} IN ({})",
455 Self::table_name(),
456 Self::primary_key(),
457 placeholders.join(", ")
458 );
459
460 let params: Vec<libsql::Value> = ids.iter().map(|&id| libsql::Value::Integer(id)).collect();
461 db.inner.execute(&sql, params).await?;
462 Ok(ids.len() as u64)
463 }
464
465 async fn delete_where(filter: FilterOperator, db: &Database) -> Result<u64> {
467 let builder = QueryBuilder::new(Self::table_name()).r#where(filter);
468
469 let (sql, params) = builder.build()?;
470 let delete_sql = sql.replace("SELECT *", "DELETE");
471 db.inner.execute(&delete_sql, params).await?;
472
473 Ok(1)
476 }
477
478 async fn list(
480 sort: Option<Vec<Sort>>,
481 pagination: Option<&Pagination>,
482 db: &Database,
483 ) -> Result<PaginatedResult<Self>> {
484 let mut builder = QueryBuilder::new(Self::table_name());
485
486 if let Some(sorts) = sort {
487 builder = builder.order_by_multiple(sorts);
488 }
489
490 let pagination = pagination.unwrap_or(&Pagination::default()).clone();
491 builder.execute_paginated::<Self>(db, &pagination).await
492 }
493
494 async fn list_where(
496 filter: FilterOperator,
497 sort: Option<Vec<Sort>>,
498 pagination: Option<&Pagination>,
499 db: &Database,
500 ) -> Result<PaginatedResult<Self>> {
501 let mut builder = QueryBuilder::new(Self::table_name()).r#where(filter);
502
503 if let Some(sorts) = sort {
504 builder = builder.order_by_multiple(sorts);
505 }
506
507 let pagination = pagination.unwrap_or(&Pagination::default()).clone();
508 builder.execute_paginated::<Self>(db, &pagination).await
509 }
510
511 async fn query(builder: QueryBuilder, db: &Database) -> Result<Vec<Self>> {
513 builder.execute::<Self>(db).await
514 }
515
516 async fn query_paginated(
518 builder: QueryBuilder,
519 pagination: &Pagination,
520 db: &Database,
521 ) -> Result<PaginatedResult<Self>> {
522 builder.execute_paginated::<Self>(db, pagination).await
523 }
524
525 async fn aggregate(
527 function: Aggregate,
528 column: &str,
529 filter: Option<FilterOperator>,
530 db: &Database,
531 ) -> Result<Option<f64>> {
532 let mut builder =
533 QueryBuilder::new(Self::table_name()).aggregate(function, column, None::<String>);
534
535 if let Some(filter) = filter {
536 builder = builder.r#where(filter);
537 }
538
539 let (sql, params) = builder.build()?;
540 let mut rows = db.inner.query(&sql, params).await?;
541
542 if let Some(row) = rows.next().await? {
543 let value = row
544 .get_value(0)
545 .ok()
546 .and_then(|v| match v {
547 libsql::Value::Integer(i) => Some(i as f64),
548 libsql::Value::Real(f) => Some(f),
549 _ => None,
550 })
551 .ok_or_else(|| Error::Query("Failed to get aggregate value".to_string()))?;
552 Ok(Some(value))
553 } else {
554 Ok(None)
555 }
556 }
557
558 fn row_to_map(row: &libsql::Row) -> Result<HashMap<String, crate::Value>> {
560 let mut map = HashMap::new();
561 for i in 0..row.column_count() {
562 if let Some(column_name) = row.column_name(i) {
563 let value = row.get_value(i).unwrap_or(libsql::Value::Null);
564 map.insert(column_name.to_string(), Self::libsql_value_to_value(&value));
565 }
566 }
567 Ok(map)
568 }
569
570 fn value_to_libsql_value(value: &crate::Value) -> libsql::Value {
572 match value {
573 crate::Value::Null => libsql::Value::Null,
574 crate::Value::Integer(i) => libsql::Value::Integer(*i),
575 crate::Value::Real(f) => libsql::Value::Real(*f),
576 crate::Value::Text(s) => libsql::Value::Text(s.clone()),
577 crate::Value::Blob(b) => libsql::Value::Blob(b.clone()),
578 crate::Value::Boolean(b) => libsql::Value::Integer(if *b { 1 } else { 0 }),
579 }
580 }
581
582 fn libsql_value_to_value(value: &libsql::Value) -> crate::Value {
584 match value {
585 libsql::Value::Null => crate::Value::Null,
586 libsql::Value::Integer(i) => crate::Value::Integer(*i),
587 libsql::Value::Real(f) => crate::Value::Real(*f),
588 libsql::Value::Text(s) => crate::Value::Text(s.clone()),
589 libsql::Value::Blob(b) => crate::Value::Blob(b.clone()),
590 }
591 }
592
593 fn log_info(message: &str) {
595 #[cfg(target_arch = "wasm32")]
596 {
597 web_sys::console::log_1(&format!("[INFO] {}: {}", Self::table_name(), message).into());
598 }
599 #[cfg(not(target_arch = "wasm32"))]
600 {
601 log::info!("[{}] {}", Self::table_name(), message);
602 }
603 }
604
605 fn log_debug(message: &str) {
607 #[cfg(target_arch = "wasm32")]
608 {
609 web_sys::console::log_1(&format!("[DEBUG] {}: {}", Self::table_name(), message).into());
610 }
611 #[cfg(not(target_arch = "wasm32"))]
612 {
613 log::debug!("[{}] {}", Self::table_name(), message);
614 }
615 }
616
617 fn log_warn(message: &str) {
619 #[cfg(target_arch = "wasm32")]
620 {
621 web_sys::console::warn_1(&format!("[WARN] {}: {}", Self::table_name(), message).into());
622 }
623 #[cfg(not(target_arch = "wasm32"))]
624 {
625 log::warn!("[{}] {}", Self::table_name(), message);
626 }
627 }
628
629 fn log_error(message: &str) {
631 #[cfg(target_arch = "wasm32")]
632 {
633 web_sys::console::error_1(
634 &format!("[ERROR] {}: {}", Self::table_name(), message).into(),
635 );
636 }
637 #[cfg(not(target_arch = "wasm32"))]
638 {
639 log::error!("[{}] {}", Self::table_name(), message);
640 }
641 }
642}