1use mongodb::bson::{doc, Bson, Document};
7use mongodb::options::{ClientOptions, FindOptions};
8use mongodb::Client;
9use serde_json::{Map, Value};
10use std::sync::Arc;
11
12use shaperail_core::{FieldSchema, FieldType, ResourceDefinition, ShaperailError};
13
14use super::filter::FilterSet;
15use super::pagination::PageRequest;
16use super::search::SearchParam;
17use super::sort::SortParam;
18use super::store::ResourceStore;
19use super::ResourceRow;
20
21#[derive(Clone)]
23pub struct MongoConnection {
24 pub db: mongodb::Database,
25 pub client: Arc<Client>,
26}
27
28impl MongoConnection {
29 pub async fn connect(url: &str) -> Result<Self, ShaperailError> {
31 let opts = ClientOptions::parse(url)
32 .await
33 .map_err(|e| ShaperailError::Internal(format!("Failed to parse MongoDB URL: {e}")))?;
34 let db_name = opts
35 .default_database
36 .clone()
37 .unwrap_or_else(|| "shaperail".to_string());
38 let client = Client::with_options(opts).map_err(|e| {
39 ShaperailError::Internal(format!("Failed to create MongoDB client: {e}"))
40 })?;
41 let db = client.database(&db_name);
42 db.run_command(doc! { "ping": 1 })
44 .await
45 .map_err(|e| ShaperailError::Internal(format!("Failed to connect to MongoDB: {e}")))?;
46 tracing::info!("Connected to MongoDB database '{db_name}'");
47 Ok(Self {
48 db,
49 client: Arc::new(client),
50 })
51 }
52
53 pub async fn ensure_collection(
55 &self,
56 resource: &ResourceDefinition,
57 ) -> Result<(), ShaperailError> {
58 let collection_name = &resource.resource;
59 let schema = build_json_schema(resource);
60
61 let create_opts = doc! {
63 "create": collection_name,
64 "validator": {
65 "$jsonSchema": schema.clone()
66 }
67 };
68 match self.db.run_command(create_opts).await {
69 Ok(_) => {
70 tracing::info!(
71 "Created MongoDB collection '{collection_name}' with schema validation"
72 );
73 }
74 Err(e) => {
75 let err_str = e.to_string();
76 if err_str.contains("already exists") {
77 let modify = doc! {
79 "collMod": collection_name,
80 "validator": {
81 "$jsonSchema": schema
82 }
83 };
84 self.db.run_command(modify).await.map_err(|e| {
85 ShaperailError::Internal(format!(
86 "Failed to update MongoDB schema for '{collection_name}': {e}"
87 ))
88 })?;
89 } else {
90 return Err(ShaperailError::Internal(format!(
91 "Failed to create MongoDB collection '{collection_name}': {e}"
92 )));
93 }
94 }
95 }
96 Ok(())
97 }
98}
99
100fn build_json_schema(resource: &ResourceDefinition) -> Document {
102 let mut properties = Document::new();
103 let mut required = Vec::new();
104
105 for (name, field) in &resource.schema {
106 let bson_type = field_type_to_bson_type(&field.field_type);
107 let mut prop = doc! { "bsonType": bson_type };
108
109 if let Some(values) = &field.values {
110 let bson_values: Vec<Bson> = values.iter().map(|v| Bson::String(v.clone())).collect();
111 prop.insert("enum", bson_values);
112 }
113 if let Some(min) = &field.min {
114 if let Some(n) = min.as_i64().or_else(|| min.as_f64().map(|f| f as i64)) {
115 if matches!(
116 field.field_type,
117 FieldType::String | FieldType::Enum | FieldType::File
118 ) {
119 prop.insert("minLength", n);
120 } else {
121 prop.insert("minimum", n);
122 }
123 }
124 }
125 if let Some(max) = &field.max {
126 if let Some(n) = max.as_i64().or_else(|| max.as_f64().map(|f| f as i64)) {
127 if matches!(
128 field.field_type,
129 FieldType::String | FieldType::Enum | FieldType::File
130 ) {
131 prop.insert("maxLength", n);
132 } else {
133 prop.insert("maximum", n);
134 }
135 }
136 }
137 properties.insert(name.clone(), prop);
138
139 if field.required && !field.generated {
140 required.push(Bson::String(name.clone()));
141 }
142 }
143
144 let mut schema = doc! {
145 "bsonType": "object",
146 "properties": properties,
147 };
148 if !required.is_empty() {
149 schema.insert("required", required);
150 }
151 schema
152}
153
154fn field_type_to_bson_type(ft: &FieldType) -> &'static str {
155 match ft {
156 FieldType::Uuid | FieldType::String | FieldType::Enum | FieldType::File => "string",
157 FieldType::Integer => "int",
158 FieldType::Bigint => "long",
159 FieldType::Number => "double",
160 FieldType::Boolean => "bool",
161 FieldType::Timestamp | FieldType::Date => "string",
162 FieldType::Json => "object",
163 FieldType::Array => "array",
164 }
165}
166
167fn json_to_bson(value: &Value, field: &FieldSchema) -> Bson {
169 if value.is_null() {
170 return Bson::Null;
171 }
172 match field.field_type {
173 FieldType::Uuid | FieldType::String | FieldType::Enum | FieldType::File => {
174 Bson::String(value.as_str().unwrap_or(&value.to_string()).to_string())
175 }
176 FieldType::Integer => Bson::Int32(value.as_i64().unwrap_or(0) as i32),
177 FieldType::Bigint => Bson::Int64(value.as_i64().unwrap_or(0)),
178 FieldType::Number => Bson::Double(value.as_f64().unwrap_or(0.0)),
179 FieldType::Boolean => Bson::Boolean(value.as_bool().unwrap_or(false)),
180 FieldType::Timestamp | FieldType::Date => {
181 Bson::String(value.as_str().unwrap_or(&value.to_string()).to_string())
182 }
183 FieldType::Json => mongodb::bson::to_bson(value).unwrap_or(Bson::Null),
184 FieldType::Array => {
185 if let Some(arr) = value.as_array() {
186 Bson::Array(
187 arr.iter()
188 .map(|v| mongodb::bson::to_bson(v).unwrap_or(Bson::Null))
189 .collect(),
190 )
191 } else {
192 Bson::Null
193 }
194 }
195 }
196}
197
198fn bson_to_json(bson: &Bson, _field: &FieldSchema) -> Value {
200 match bson {
201 Bson::Null => Value::Null,
202 Bson::String(s) => Value::String(s.clone()),
203 Bson::Int32(n) => Value::Number((*n as i64).into()),
204 Bson::Int64(n) => Value::Number((*n).into()),
205 Bson::Double(n) => serde_json::Number::from_f64(*n)
206 .map(Value::Number)
207 .unwrap_or(Value::Null),
208 Bson::Boolean(b) => Value::Bool(*b),
209 Bson::Document(d) => {
210 mongodb::bson::from_document::<Value>(d.clone()).unwrap_or(Value::Null)
211 }
212 Bson::Array(arr) => Value::Array(arr.iter().map(|b| bson_to_json(b, _field)).collect()),
213 _ => Value::String(bson.to_string()),
214 }
215}
216
217fn doc_to_json(doc: &Document, resource: &ResourceDefinition) -> Value {
219 let mut obj = Map::new();
220 for (name, field) in &resource.schema {
221 let bson_val = doc.get(name).unwrap_or(&Bson::Null);
222 obj.insert(name.clone(), bson_to_json(bson_val, field));
223 }
224 Value::Object(obj)
225}
226
227pub struct MongoBackedStore {
229 resource: Arc<ResourceDefinition>,
230 connection: MongoConnection,
231}
232
233impl MongoBackedStore {
234 pub fn new(resource: Arc<ResourceDefinition>, connection: MongoConnection) -> Self {
235 Self {
236 resource,
237 connection,
238 }
239 }
240
241 fn collection(&self) -> mongodb::Collection<Document> {
242 self.connection.db.collection(&self.resource.resource)
243 }
244
245 fn primary_key(&self) -> &str {
246 self.resource
247 .schema
248 .iter()
249 .find(|(_, fs)| fs.primary)
250 .map(|(name, _)| name.as_str())
251 .unwrap_or("id")
252 }
253
254 fn has_soft_delete(&self) -> bool {
255 self.resource
256 .endpoints
257 .as_ref()
258 .map(|eps| eps.values().any(|ep| ep.soft_delete))
259 .unwrap_or(false)
260 }
261
262 fn not_deleted_filter(&self) -> Document {
263 if self.has_soft_delete() {
264 doc! { "deleted_at": Bson::Null }
265 } else {
266 doc! {}
267 }
268 }
269}
270
271#[async_trait::async_trait]
272impl ResourceStore for MongoBackedStore {
273 fn resource_name(&self) -> &str {
274 &self.resource.resource
275 }
276
277 async fn find_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
278 let pk = self.primary_key();
279 let mut filter = self.not_deleted_filter();
280 filter.insert(pk, id.to_string());
281
282 let result = self
283 .collection()
284 .find_one(filter)
285 .await
286 .map_err(|e| ShaperailError::Internal(format!("MongoDB find_by_id failed: {e}")))?;
287
288 match result {
289 Some(doc) => {
290 let json = doc_to_json(&doc, &self.resource);
291 Ok(ResourceRow(json))
292 }
293 None => Err(ShaperailError::NotFound),
294 }
295 }
296
297 async fn find_all(
298 &self,
299 _endpoint: &shaperail_core::EndpointSpec,
300 filters: &FilterSet,
301 _search: Option<&SearchParam>,
302 sort: &SortParam,
303 page: &PageRequest,
304 ) -> Result<(Vec<ResourceRow>, Value), ShaperailError> {
305 let mut filter = self.not_deleted_filter();
306
307 for f in &filters.filters {
308 if let Some(field) = self.resource.schema.get(&f.field) {
309 let bson_val = json_to_bson(&Value::String(f.value.clone()), field);
310 filter.insert(f.field.clone(), bson_val);
311 }
312 }
313
314 let mut sort_doc = Document::new();
315 for s in &sort.fields {
316 let dir = match s.direction {
317 super::sort::SortDirection::Asc => 1,
318 super::sort::SortDirection::Desc => -1,
319 };
320 sort_doc.insert(s.field.clone(), dir);
321 }
322
323 match page {
324 PageRequest::Cursor { after, limit } => {
325 if let Some(cursor) = after {
326 let id_str = super::pagination::decode_cursor(cursor)?;
327 let pk = self.primary_key();
328 filter.insert(pk, doc! { "$gt": id_str });
329 }
330 if sort_doc.is_empty() {
331 sort_doc.insert(self.primary_key(), 1);
332 }
333 let opts = FindOptions::builder()
334 .sort(sort_doc)
335 .limit(Some(*limit + 1))
336 .build();
337 let mut cursor = self
338 .collection()
339 .find(filter)
340 .with_options(opts)
341 .await
342 .map_err(|e| {
343 ShaperailError::Internal(format!("MongoDB find_all failed: {e}"))
344 })?;
345
346 let mut rows = Vec::new();
347 while cursor
348 .advance()
349 .await
350 .map_err(|e| ShaperailError::Internal(format!("MongoDB cursor error: {e}")))?
351 {
352 let doc = cursor.deserialize_current().map_err(|e| {
353 ShaperailError::Internal(format!("MongoDB deserialize error: {e}"))
354 })?;
355 rows.push(doc);
356 }
357
358 let has_more = rows.len() as i64 > *limit;
359 let result_rows: Vec<ResourceRow> = rows
360 .into_iter()
361 .take(*limit as usize)
362 .map(|d| ResourceRow(doc_to_json(&d, &self.resource)))
363 .collect();
364 let cursor_val = result_rows
365 .last()
366 .and_then(|r| r.0.get("id"))
367 .and_then(|v| v.as_str())
368 .map(super::pagination::encode_cursor);
369 let meta = serde_json::json!({ "cursor": cursor_val, "has_more": has_more });
370 Ok((result_rows, meta))
371 }
372 PageRequest::Offset { offset, limit } => {
373 let opts = FindOptions::builder()
374 .sort(if sort_doc.is_empty() {
375 None
376 } else {
377 Some(sort_doc)
378 })
379 .skip(Some(*offset as u64))
380 .limit(Some(*limit))
381 .build();
382 let mut cursor = self
383 .collection()
384 .find(filter)
385 .with_options(opts)
386 .await
387 .map_err(|e| {
388 ShaperailError::Internal(format!("MongoDB find_all failed: {e}"))
389 })?;
390
391 let mut result_rows = Vec::new();
392 while cursor
393 .advance()
394 .await
395 .map_err(|e| ShaperailError::Internal(format!("MongoDB cursor error: {e}")))?
396 {
397 let doc = cursor.deserialize_current().map_err(|e| {
398 ShaperailError::Internal(format!("MongoDB deserialize error: {e}"))
399 })?;
400 result_rows.push(ResourceRow(doc_to_json(&doc, &self.resource)));
401 }
402 let total = result_rows.len() as i64;
403 let meta = serde_json::json!({
404 "offset": offset,
405 "limit": limit,
406 "total": total
407 });
408 Ok((result_rows, meta))
409 }
410 }
411 }
412
413 async fn insert(&self, data: &Map<String, Value>) -> Result<ResourceRow, ShaperailError> {
414 let mut doc = Document::new();
415 let mut generated_id: Option<String> = None;
416
417 for (name, field) in &self.resource.schema {
418 if field.generated {
419 match field.field_type {
420 FieldType::Uuid => {
421 let id = uuid::Uuid::new_v4().to_string();
422 doc.insert(name.clone(), Bson::String(id.clone()));
423 if field.primary {
424 generated_id = Some(id);
425 }
426 }
427 FieldType::Timestamp => {
428 doc.insert(name.clone(), Bson::String(chrono::Utc::now().to_rfc3339()));
429 }
430 _ => {}
431 }
432 continue;
433 }
434 if let Some(value) = data.get(name) {
435 doc.insert(name.clone(), json_to_bson(value, field));
436 } else if let Some(default) = &field.default {
437 doc.insert(name.clone(), json_to_bson(default, field));
438 }
439 }
440
441 self.collection()
442 .insert_one(&doc)
443 .await
444 .map_err(|e| ShaperailError::Internal(format!("MongoDB insert failed: {e}")))?;
445
446 if let Some(id_str) = generated_id {
448 let pk = self.primary_key();
449 let filter = doc! { pk: &id_str };
450 let result = self.collection().find_one(filter).await.map_err(|e| {
451 ShaperailError::Internal(format!("MongoDB find after insert failed: {e}"))
452 })?;
453 match result {
454 Some(d) => Ok(ResourceRow(doc_to_json(&d, &self.resource))),
455 None => Ok(ResourceRow(doc_to_json(&doc, &self.resource))),
456 }
457 } else {
458 Ok(ResourceRow(doc_to_json(&doc, &self.resource)))
459 }
460 }
461
462 async fn update_by_id(
463 &self,
464 id: &uuid::Uuid,
465 data: &Map<String, Value>,
466 ) -> Result<ResourceRow, ShaperailError> {
467 let pk = self.primary_key();
468 let mut filter = self.not_deleted_filter();
469 filter.insert(pk, id.to_string());
470
471 let mut set_doc = Document::new();
472 for (name, value) in data {
473 if let Some(field) = self.resource.schema.get(name) {
474 if field.primary || field.generated {
475 continue;
476 }
477 set_doc.insert(name.clone(), json_to_bson(value, field));
478 }
479 }
480 if self.resource.schema.contains_key("updated_at") {
481 set_doc.insert("updated_at", Bson::String(chrono::Utc::now().to_rfc3339()));
482 }
483 if set_doc.is_empty() {
484 return Err(ShaperailError::Validation(vec![
485 shaperail_core::FieldError {
486 field: "body".to_string(),
487 message: "No valid fields to update".to_string(),
488 code: "empty_update".to_string(),
489 },
490 ]));
491 }
492
493 let update = doc! { "$set": set_doc };
494 let result = self
495 .collection()
496 .update_one(filter.clone(), update)
497 .await
498 .map_err(|e| ShaperailError::Internal(format!("MongoDB update failed: {e}")))?;
499
500 if result.matched_count == 0 {
501 return Err(ShaperailError::NotFound);
502 }
503
504 let updated = self
506 .collection()
507 .find_one(filter)
508 .await
509 .map_err(|e| {
510 ShaperailError::Internal(format!("MongoDB find after update failed: {e}"))
511 })?
512 .ok_or(ShaperailError::NotFound)?;
513 Ok(ResourceRow(doc_to_json(&updated, &self.resource)))
514 }
515
516 async fn soft_delete_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
517 let pk = self.primary_key();
518 let mut filter = self.not_deleted_filter();
519 filter.insert(pk, id.to_string());
520
521 let update = doc! {
522 "$set": {
523 "deleted_at": chrono::Utc::now().to_rfc3339()
524 }
525 };
526 let result = self
527 .collection()
528 .update_one(filter.clone(), update)
529 .await
530 .map_err(|e| ShaperailError::Internal(format!("MongoDB soft_delete failed: {e}")))?;
531
532 if result.matched_count == 0 {
533 return Err(ShaperailError::NotFound);
534 }
535
536 let pk_filter = doc! { pk: id.to_string() };
538 let doc = self
539 .collection()
540 .find_one(pk_filter)
541 .await
542 .map_err(|e| {
543 ShaperailError::Internal(format!("MongoDB find after soft_delete failed: {e}"))
544 })?
545 .ok_or(ShaperailError::NotFound)?;
546 Ok(ResourceRow(doc_to_json(&doc, &self.resource)))
547 }
548
549 async fn hard_delete_by_id(&self, id: &uuid::Uuid) -> Result<ResourceRow, ShaperailError> {
550 let row = self.find_by_id(id).await?;
551 let pk = self.primary_key();
552 let filter = doc! { pk: id.to_string() };
553
554 self.collection()
555 .delete_one(filter)
556 .await
557 .map_err(|e| ShaperailError::Internal(format!("MongoDB hard_delete failed: {e}")))?;
558
559 Ok(row)
560 }
561}