Skip to main content

shaperail_runtime/db/
mongo.rs

1//! MongoDB backend (M14). Provides CRUD operations via the mongodb driver.
2//!
3//! Resources routed to a MongoDB connection use BSON documents instead of SQL rows.
4//! Schema validation is applied via MongoDB's JSON Schema validator on collection creation.
5
6use mongodb::bson::{doc, Bson, Document};
7use mongodb::options::{ClientOptions, FindOptions};
8use mongodb::Client;
9use serde_json::{Map, Value};
10use std::sync::Arc;
11
12use shaperail_core::{FieldSchema, FieldType, ResourceDefinition, ShaperailError};
13
14use super::filter::FilterSet;
15use super::pagination::PageRequest;
16use super::search::SearchParam;
17use super::sort::SortParam;
18use super::store::ResourceStore;
19use super::ResourceRow;
20
21/// A MongoDB connection wrapping a database handle.
22#[derive(Clone)]
23pub struct MongoConnection {
24    pub db: mongodb::Database,
25    pub client: Arc<Client>,
26}
27
28impl MongoConnection {
29    /// Connect to MongoDB from a URL. The database name is extracted from the URL path.
30    pub async fn connect(url: &str) -> Result<Self, ShaperailError> {
31        let opts = ClientOptions::parse(url)
32            .await
33            .map_err(|e| ShaperailError::Internal(format!("Failed to parse MongoDB URL: {e}")))?;
34        let db_name = opts
35            .default_database
36            .clone()
37            .unwrap_or_else(|| "shaperail".to_string());
38        let client = Client::with_options(opts).map_err(|e| {
39            ShaperailError::Internal(format!("Failed to create MongoDB client: {e}"))
40        })?;
41        let db = client.database(&db_name);
42        // Verify connectivity.
43        db.run_command(doc! { "ping": 1 })
44            .await
45            .map_err(|e| ShaperailError::Internal(format!("Failed to connect to MongoDB: {e}")))?;
46        tracing::info!("Connected to MongoDB database '{db_name}'");
47        Ok(Self {
48            db,
49            client: Arc::new(client),
50        })
51    }
52
53    /// Ensure the collection exists with JSON Schema validation from the resource definition.
54    pub async fn ensure_collection(
55        &self,
56        resource: &ResourceDefinition,
57    ) -> Result<(), ShaperailError> {
58        let collection_name = &resource.resource;
59        let schema = build_json_schema(resource);
60
61        // Try to create collection with validator; ignore "already exists" errors.
62        let create_opts = doc! {
63            "create": collection_name,
64            "validator": {
65                "$jsonSchema": schema.clone()
66            }
67        };
68        match self.db.run_command(create_opts).await {
69            Ok(_) => {
70                tracing::info!(
71                    "Created MongoDB collection '{collection_name}' with schema validation"
72                );
73            }
74            Err(e) => {
75                let err_str = e.to_string();
76                if err_str.contains("already exists") {
77                    // Update validator on existing collection.
78                    let modify = doc! {
79                        "collMod": collection_name,
80                        "validator": {
81                            "$jsonSchema": schema
82                        }
83                    };
84                    self.db.run_command(modify).await.map_err(|e| {
85                        ShaperailError::Internal(format!(
86                            "Failed to update MongoDB schema for '{collection_name}': {e}"
87                        ))
88                    })?;
89                } else {
90                    return Err(ShaperailError::Internal(format!(
91                        "Failed to create MongoDB collection '{collection_name}': {e}"
92                    )));
93                }
94            }
95        }
96        Ok(())
97    }
98}
99
100/// Build a MongoDB JSON Schema document from a resource definition.
101fn build_json_schema(resource: &ResourceDefinition) -> Document {
102    let mut properties = Document::new();
103    let mut required = Vec::new();
104
105    for (name, field) in &resource.schema {
106        let bson_type = field_type_to_bson_type(&field.field_type);
107        let mut prop = doc! { "bsonType": bson_type };
108
109        if let Some(values) = &field.values {
110            let bson_values: Vec<Bson> = values.iter().map(|v| Bson::String(v.clone())).collect();
111            prop.insert("enum", bson_values);
112        }
113        if let Some(min) = &field.min {
114            if let Some(n) = min.as_i64().or_else(|| min.as_f64().map(|f| f as i64)) {
115                if matches!(
116                    field.field_type,
117                    FieldType::String | FieldType::Enum | FieldType::File
118                ) {
119                    prop.insert("minLength", n);
120                } else {
121                    prop.insert("minimum", n);
122                }
123            }
124        }
125        if let Some(max) = &field.max {
126            if let Some(n) = max.as_i64().or_else(|| max.as_f64().map(|f| f as i64)) {
127                if matches!(
128                    field.field_type,
129                    FieldType::String | FieldType::Enum | FieldType::File
130                ) {
131                    prop.insert("maxLength", n);
132                } else {
133                    prop.insert("maximum", n);
134                }
135            }
136        }
137        properties.insert(name.clone(), prop);
138
139        if field.required && !field.generated {
140            required.push(Bson::String(name.clone()));
141        }
142    }
143
144    let mut schema = doc! {
145        "bsonType": "object",
146        "properties": properties,
147    };
148    if !required.is_empty() {
149        schema.insert("required", required);
150    }
151    schema
152}
153
154fn field_type_to_bson_type(ft: &FieldType) -> &'static str {
155    match ft {
156        FieldType::Uuid | FieldType::String | FieldType::Enum | FieldType::File => "string",
157        FieldType::Integer => "int",
158        FieldType::Bigint => "long",
159        FieldType::Number => "double",
160        FieldType::Boolean => "bool",
161        FieldType::Timestamp | FieldType::Date => "string",
162        FieldType::Json => "object",
163        FieldType::Array => "array",
164    }
165}
166
167/// Convert a JSON value to a BSON value for a given field schema.
168fn json_to_bson(value: &Value, field: &FieldSchema) -> Bson {
169    if value.is_null() {
170        return Bson::Null;
171    }
172    match field.field_type {
173        FieldType::Uuid | FieldType::String | FieldType::Enum | FieldType::File => {
174            Bson::String(value.as_str().unwrap_or(&value.to_string()).to_string())
175        }
176        FieldType::Integer => Bson::Int32(value.as_i64().unwrap_or(0) as i32),
177        FieldType::Bigint => Bson::Int64(value.as_i64().unwrap_or(0)),
178        FieldType::Number => Bson::Double(value.as_f64().unwrap_or(0.0)),
179        FieldType::Boolean => Bson::Boolean(value.as_bool().unwrap_or(false)),
180        FieldType::Timestamp | FieldType::Date => {
181            Bson::String(value.as_str().unwrap_or(&value.to_string()).to_string())
182        }
183        FieldType::Json => mongodb::bson::to_bson(value).unwrap_or(Bson::Null),
184        FieldType::Array => {
185            if let Some(arr) = value.as_array() {
186                Bson::Array(
187                    arr.iter()
188                        .map(|v| mongodb::bson::to_bson(v).unwrap_or(Bson::Null))
189                        .collect(),
190                )
191            } else {
192                Bson::Null
193            }
194        }
195    }
196}
197
198/// Convert a BSON value to JSON for a given field type.
199fn bson_to_json(bson: &Bson, _field: &FieldSchema) -> Value {
200    match bson {
201        Bson::Null => Value::Null,
202        Bson::String(s) => Value::String(s.clone()),
203        Bson::Int32(n) => Value::Number((*n as i64).into()),
204        Bson::Int64(n) => Value::Number((*n).into()),
205        Bson::Double(n) => serde_json::Number::from_f64(*n)
206            .map(Value::Number)
207            .unwrap_or(Value::Null),
208        Bson::Boolean(b) => Value::Bool(*b),
209        Bson::Document(d) => {
210            mongodb::bson::from_document::<Value>(d.clone()).unwrap_or(Value::Null)
211        }
212        Bson::Array(arr) => Value::Array(arr.iter().map(|b| bson_to_json(b, _field)).collect()),
213        _ => Value::String(bson.to_string()),
214    }
215}
216
217/// Convert a BSON document to a JSON object using the resource schema.
218fn doc_to_json(doc: &Document, resource: &ResourceDefinition) -> Value {
219    let mut obj = Map::new();
220    for (name, field) in &resource.schema {
221        let bson_val = doc.get(name).unwrap_or(&Bson::Null);
222        obj.insert(name.clone(), bson_to_json(bson_val, field));
223    }
224    Value::Object(obj)
225}
226
227/// MongoDB-backed resource store (M14).
228pub struct MongoBackedStore {
229    resource: Arc<ResourceDefinition>,
230    connection: MongoConnection,
231}
232
233impl MongoBackedStore {
234    pub fn new(resource: Arc<ResourceDefinition>, connection: MongoConnection) -> Self {
235        Self {
236            resource,
237            connection,
238        }
239    }
240
241    fn collection(&self) -> mongodb::Collection<Document> {
242        self.connection.db.collection(&self.resource.resource)
243    }
244
245    fn primary_key(&self) -> &str {
246        self.resource
247            .schema
248            .iter()
249            .find(|(_, fs)| fs.primary)
250            .map(|(name, _)| name.as_str())
251            .unwrap_or("id")
252    }
253
254    fn has_soft_delete(&self) -> bool {
255        self.resource
256            .endpoints
257            .as_ref()
258            .map(|eps| eps.values().any(|ep| ep.soft_delete))
259            .unwrap_or(false)
260    }
261
262    fn not_deleted_filter(&self) -> Document {
263        if self.has_soft_delete() {
264            doc! { "deleted_at": Bson::Null }
265        } else {
266            doc! {}
267        }
268    }
269}
270
271#[async_trait::async_trait]
272impl ResourceStore for MongoBackedStore {
273    fn resource_name(&self) -> &str {
274        &self.resource.resource
275    }
276
277    async fn find_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
278        let pk = self.primary_key();
279        let mut filter = self.not_deleted_filter();
280        filter.insert(pk, id.to_string());
281
282        let result = self
283            .collection()
284            .find_one(filter)
285            .await
286            .map_err(|e| ShaperailError::Internal(format!("MongoDB find_by_id failed: {e}")))?;
287
288        match result {
289            Some(doc) => {
290                let json = doc_to_json(&doc, &self.resource);
291                Ok(ResourceRow(json))
292            }
293            None => Err(ShaperailError::NotFound),
294        }
295    }
296
297    async fn find_all(
298        &self,
299        _endpoint: &shaperail_core::EndpointSpec,
300        filters: &FilterSet,
301        _search: Option<&SearchParam>,
302        sort: &SortParam,
303        page: &PageRequest,
304    ) -> Result<(Vec<ResourceRow>, Value), ShaperailError> {
305        let mut filter = self.not_deleted_filter();
306
307        for f in &filters.filters {
308            if let Some(field) = self.resource.schema.get(&f.field) {
309                let bson_val = json_to_bson(&Value::String(f.value.clone()), field);
310                filter.insert(f.field.clone(), bson_val);
311            }
312        }
313
314        let mut sort_doc = Document::new();
315        for s in &sort.fields {
316            let dir = match s.direction {
317                super::sort::SortDirection::Asc => 1,
318                super::sort::SortDirection::Desc => -1,
319            };
320            sort_doc.insert(s.field.clone(), dir);
321        }
322
323        match page {
324            PageRequest::Cursor { after, limit } => {
325                if let Some(cursor) = after {
326                    let id_str = super::pagination::decode_cursor(cursor)?;
327                    let pk = self.primary_key();
328                    filter.insert(pk, doc! { "$gt": id_str });
329                }
330                if sort_doc.is_empty() {
331                    sort_doc.insert(self.primary_key(), 1);
332                }
333                let opts = FindOptions::builder()
334                    .sort(sort_doc)
335                    .limit(Some(*limit + 1))
336                    .build();
337                let mut cursor = self
338                    .collection()
339                    .find(filter)
340                    .with_options(opts)
341                    .await
342                    .map_err(|e| {
343                        ShaperailError::Internal(format!("MongoDB find_all failed: {e}"))
344                    })?;
345
346                let mut rows = Vec::new();
347                while cursor
348                    .advance()
349                    .await
350                    .map_err(|e| ShaperailError::Internal(format!("MongoDB cursor error: {e}")))?
351                {
352                    let doc = cursor.deserialize_current().map_err(|e| {
353                        ShaperailError::Internal(format!("MongoDB deserialize error: {e}"))
354                    })?;
355                    rows.push(doc);
356                }
357
358                let has_more = rows.len() as i64 > *limit;
359                let result_rows: Vec<ResourceRow> = rows
360                    .into_iter()
361                    .take(*limit as usize)
362                    .map(|d| ResourceRow(doc_to_json(&d, &self.resource)))
363                    .collect();
364                let cursor_val = result_rows
365                    .last()
366                    .and_then(|r| r.0.get("id"))
367                    .and_then(|v| v.as_str())
368                    .map(super::pagination::encode_cursor);
369                let meta = serde_json::json!({ "cursor": cursor_val, "has_more": has_more });
370                Ok((result_rows, meta))
371            }
372            PageRequest::Offset { offset, limit } => {
373                let opts = FindOptions::builder()
374                    .sort(if sort_doc.is_empty() {
375                        None
376                    } else {
377                        Some(sort_doc)
378                    })
379                    .skip(Some(*offset as u64))
380                    .limit(Some(*limit))
381                    .build();
382                let mut cursor = self
383                    .collection()
384                    .find(filter)
385                    .with_options(opts)
386                    .await
387                    .map_err(|e| {
388                        ShaperailError::Internal(format!("MongoDB find_all failed: {e}"))
389                    })?;
390
391                let mut result_rows = Vec::new();
392                while cursor
393                    .advance()
394                    .await
395                    .map_err(|e| ShaperailError::Internal(format!("MongoDB cursor error: {e}")))?
396                {
397                    let doc = cursor.deserialize_current().map_err(|e| {
398                        ShaperailError::Internal(format!("MongoDB deserialize error: {e}"))
399                    })?;
400                    result_rows.push(ResourceRow(doc_to_json(&doc, &self.resource)));
401                }
402                let total = result_rows.len() as i64;
403                let meta = serde_json::json!({
404                    "offset": offset,
405                    "limit": limit,
406                    "total": total
407                });
408                Ok((result_rows, meta))
409            }
410        }
411    }
412
413    async fn insert(&self, data: &Map<String, Value>) -> Result<ResourceRow, ShaperailError> {
414        let mut doc = Document::new();
415        let mut generated_id: Option<String> = None;
416
417        for (name, field) in &self.resource.schema {
418            if field.generated {
419                match field.field_type {
420                    FieldType::Uuid => {
421                        let id = uuid::Uuid::new_v4().to_string();
422                        doc.insert(name.clone(), Bson::String(id.clone()));
423                        if field.primary {
424                            generated_id = Some(id);
425                        }
426                    }
427                    FieldType::Timestamp => {
428                        doc.insert(name.clone(), Bson::String(chrono::Utc::now().to_rfc3339()));
429                    }
430                    _ => {}
431                }
432                continue;
433            }
434            if let Some(value) = data.get(name) {
435                doc.insert(name.clone(), json_to_bson(value, field));
436            } else if let Some(default) = &field.default {
437                doc.insert(name.clone(), json_to_bson(default, field));
438            }
439        }
440
441        self.collection()
442            .insert_one(&doc)
443            .await
444            .map_err(|e| ShaperailError::Internal(format!("MongoDB insert failed: {e}")))?;
445
446        // Return the inserted document.
447        if let Some(id_str) = generated_id {
448            let pk = self.primary_key();
449            let filter = doc! { pk: &id_str };
450            let result = self.collection().find_one(filter).await.map_err(|e| {
451                ShaperailError::Internal(format!("MongoDB find after insert failed: {e}"))
452            })?;
453            match result {
454                Some(d) => Ok(ResourceRow(doc_to_json(&d, &self.resource))),
455                None => Ok(ResourceRow(doc_to_json(&doc, &self.resource))),
456            }
457        } else {
458            Ok(ResourceRow(doc_to_json(&doc, &self.resource)))
459        }
460    }
461
462    async fn update_by_id(
463        &self,
464        id: &uuid::Uuid,
465        data: &Map<String, Value>,
466    ) -> Result<ResourceRow, ShaperailError> {
467        let pk = self.primary_key();
468        let mut filter = self.not_deleted_filter();
469        filter.insert(pk, id.to_string());
470
471        let mut set_doc = Document::new();
472        for (name, value) in data {
473            if let Some(field) = self.resource.schema.get(name) {
474                if field.primary || field.generated {
475                    continue;
476                }
477                set_doc.insert(name.clone(), json_to_bson(value, field));
478            }
479        }
480        if self.resource.schema.contains_key("updated_at") {
481            set_doc.insert("updated_at", Bson::String(chrono::Utc::now().to_rfc3339()));
482        }
483        if set_doc.is_empty() {
484            return Err(ShaperailError::Validation(vec![
485                shaperail_core::FieldError {
486                    field: "body".to_string(),
487                    message: "No valid fields to update".to_string(),
488                    code: "empty_update".to_string(),
489                },
490            ]));
491        }
492
493        let update = doc! { "$set": set_doc };
494        let result = self
495            .collection()
496            .update_one(filter.clone(), update)
497            .await
498            .map_err(|e| ShaperailError::Internal(format!("MongoDB update failed: {e}")))?;
499
500        if result.matched_count == 0 {
501            return Err(ShaperailError::NotFound);
502        }
503
504        // Fetch updated document.
505        let updated = self
506            .collection()
507            .find_one(filter)
508            .await
509            .map_err(|e| {
510                ShaperailError::Internal(format!("MongoDB find after update failed: {e}"))
511            })?
512            .ok_or(ShaperailError::NotFound)?;
513        Ok(ResourceRow(doc_to_json(&updated, &self.resource)))
514    }
515
516    async fn soft_delete_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
517        let pk = self.primary_key();
518        let mut filter = self.not_deleted_filter();
519        filter.insert(pk, id.to_string());
520
521        let update = doc! {
522            "$set": {
523                "deleted_at": chrono::Utc::now().to_rfc3339()
524            }
525        };
526        let result = self
527            .collection()
528            .update_one(filter.clone(), update)
529            .await
530            .map_err(|e| ShaperailError::Internal(format!("MongoDB soft_delete failed: {e}")))?;
531
532        if result.matched_count == 0 {
533            return Err(ShaperailError::NotFound);
534        }
535
536        // Return with deleted_at set — remove not-deleted filter.
537        let pk_filter = doc! { pk: id.to_string() };
538        let doc = self
539            .collection()
540            .find_one(pk_filter)
541            .await
542            .map_err(|e| {
543                ShaperailError::Internal(format!("MongoDB find after soft_delete failed: {e}"))
544            })?
545            .ok_or(ShaperailError::NotFound)?;
546        Ok(ResourceRow(doc_to_json(&doc, &self.resource)))
547    }
548
549    async fn hard_delete_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
550        let row = self.find_by_id(id).await?;
551        let pk = self.primary_key();
552        let filter = doc! { pk: id.to_string() };
553
554        self.collection()
555            .delete_one(filter)
556            .await
557            .map_err(|e| ShaperailError::Internal(format!("MongoDB hard_delete failed: {e}")))?;
558
559        Ok(row)
560    }
561}