Skip to main content

rouchdb_query/
mapreduce.rs

1//! Map/reduce view engine.
2//!
3//! Users define views by providing map functions (Rust closures) and optional
4//! reduce functions. The engine runs the map over all documents and collects
5//! key-value pairs, then optionally reduces them.
6
7use std::cmp::Ordering;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::collation::collate;
11use rouchdb_core::document::AllDocsOptions;
12use rouchdb_core::error::Result;
13
14/// A key-value pair emitted by a map function.
15#[derive(Debug, Clone)]
16pub struct EmittedRow {
17    pub id: String,
18    pub key: serde_json::Value,
19    pub value: serde_json::Value,
20}
21
22/// Built-in reduce functions matching CouchDB's built-ins.
23pub enum ReduceFn {
24    /// Sum all numeric values.
25    Sum,
26    /// Count the number of rows.
27    Count,
28    /// Compute statistics (sum, count, min, max, sumsqr).
29    Stats,
30    /// Custom reduce function.
31    #[allow(clippy::type_complexity)]
32    Custom(Box<dyn Fn(&[serde_json::Value], &[serde_json::Value], bool) -> serde_json::Value>),
33}
34
35/// Options for querying a view.
36#[derive(Debug, Clone, Default)]
37pub struct ViewQueryOptions {
38    /// Only return rows with this exact key.
39    pub key: Option<serde_json::Value>,
40    /// Start of key range (inclusive).
41    pub start_key: Option<serde_json::Value>,
42    /// End of key range (inclusive by default).
43    pub end_key: Option<serde_json::Value>,
44    /// Whether to include the end_key in the range.
45    pub inclusive_end: bool,
46    /// Reverse the order.
47    pub descending: bool,
48    /// Number of rows to skip.
49    pub skip: u64,
50    /// Maximum number of rows.
51    pub limit: Option<u64>,
52    /// Include the full document in each row.
53    pub include_docs: bool,
54    /// Whether to run the reduce function.
55    pub reduce: bool,
56    /// Group by key (requires reduce).
57    pub group: bool,
58    /// Group to this many array elements of the key.
59    pub group_level: Option<u64>,
60}
61
62impl ViewQueryOptions {
63    pub fn new() -> Self {
64        Self {
65            inclusive_end: true,
66            ..Default::default()
67        }
68    }
69}
70
71/// Result of querying a view.
72#[derive(Debug, Clone)]
73pub struct ViewResult {
74    pub total_rows: u64,
75    pub offset: u64,
76    pub rows: Vec<ViewRow>,
77}
78
79/// A single row in a view result.
80#[derive(Debug, Clone)]
81pub struct ViewRow {
82    pub id: Option<String>,
83    pub key: serde_json::Value,
84    pub value: serde_json::Value,
85    pub doc: Option<serde_json::Value>,
86}
87
88/// Run a temporary (ad-hoc) map/reduce query.
89///
90/// The `map_fn` receives a document JSON and returns emitted key-value pairs.
91pub async fn query_view(
92    adapter: &dyn Adapter,
93    map_fn: &dyn Fn(&serde_json::Value) -> Vec<(serde_json::Value, serde_json::Value)>,
94    reduce_fn: Option<&ReduceFn>,
95    opts: ViewQueryOptions,
96) -> Result<ViewResult> {
97    // Run map over all documents
98    let all = adapter
99        .all_docs(AllDocsOptions {
100            include_docs: true,
101            ..AllDocsOptions::new()
102        })
103        .await?;
104
105    let mut emitted: Vec<EmittedRow> = Vec::new();
106
107    for row in &all.rows {
108        if let Some(ref doc_json) = row.doc {
109            let pairs = map_fn(doc_json);
110            for (key, value) in pairs {
111                emitted.push(EmittedRow {
112                    id: row.id.clone(),
113                    key,
114                    value,
115                });
116            }
117        }
118    }
119
120    // Sort by key using CouchDB collation
121    emitted.sort_by(|a, b| {
122        let cmp = collate(&a.key, &b.key);
123        if cmp == Ordering::Equal {
124            a.id.cmp(&b.id)
125        } else {
126            cmp
127        }
128    });
129
130    if opts.descending {
131        emitted.reverse();
132    }
133
134    // Filter by key range
135    let emitted = filter_by_range(emitted, &opts);
136
137    let total_rows = emitted.len() as u64;
138
139    // Reduce
140    if opts.reduce
141        && let Some(reduce) = reduce_fn {
142            let rows = if opts.group || opts.group_level.is_some() {
143                group_reduce(&emitted, reduce, opts.group_level)
144            } else {
145                let keys: Vec<serde_json::Value> = emitted.iter().map(|r| r.key.clone()).collect();
146                let values: Vec<serde_json::Value> =
147                    emitted.iter().map(|r| r.value.clone()).collect();
148                let result = apply_reduce(reduce, &keys, &values, false);
149                vec![ViewRow {
150                    id: None,
151                    key: serde_json::Value::Null,
152                    value: result,
153                    doc: None,
154                }]
155            };
156
157            return Ok(ViewResult {
158                total_rows: rows.len() as u64,
159                offset: 0,
160                rows,
161            });
162        }
163
164    // Apply skip and limit
165    let skip = opts.skip as usize;
166    let rows: Vec<ViewRow> = emitted
167        .into_iter()
168        .skip(skip)
169        .take(opts.limit.unwrap_or(u64::MAX) as usize)
170        .map(|r| ViewRow {
171            id: Some(r.id),
172            key: r.key,
173            value: r.value,
174            doc: None,
175        })
176        .collect();
177
178    Ok(ViewResult {
179        total_rows,
180        offset: opts.skip,
181        rows,
182    })
183}
184
185fn filter_by_range(rows: Vec<EmittedRow>, opts: &ViewQueryOptions) -> Vec<EmittedRow> {
186    rows.into_iter()
187        .filter(|r| {
188            if let Some(ref key) = opts.key {
189                return collate(&r.key, key) == Ordering::Equal;
190            }
191
192            if let Some(ref start) = opts.start_key {
193                if opts.descending {
194                    if collate(&r.key, start) == Ordering::Greater {
195                        return false;
196                    }
197                } else if collate(&r.key, start) == Ordering::Less {
198                    return false;
199                }
200            }
201
202            if let Some(ref end) = opts.end_key {
203                if opts.descending {
204                    let cmp = collate(&r.key, end);
205                    if opts.inclusive_end {
206                        if cmp == Ordering::Less {
207                            return false;
208                        }
209                    } else if cmp != Ordering::Greater {
210                        return false;
211                    }
212                } else {
213                    let cmp = collate(&r.key, end);
214                    if opts.inclusive_end {
215                        if cmp == Ordering::Greater {
216                            return false;
217                        }
218                    } else if cmp != Ordering::Less {
219                        return false;
220                    }
221                }
222            }
223
224            true
225        })
226        .collect()
227}
228
229fn group_reduce(rows: &[EmittedRow], reduce: &ReduceFn, group_level: Option<u64>) -> Vec<ViewRow> {
230    if rows.is_empty() {
231        return vec![];
232    }
233
234    let mut result = Vec::new();
235    let mut current_key = group_key(&rows[0].key, group_level);
236    let mut keys = vec![rows[0].key.clone()];
237    let mut values = vec![rows[0].value.clone()];
238
239    for row in &rows[1..] {
240        let gk = group_key(&row.key, group_level);
241        if collate(&gk, &current_key) == Ordering::Equal {
242            keys.push(row.key.clone());
243            values.push(row.value.clone());
244        } else {
245            // Emit group
246            let reduced = apply_reduce(reduce, &keys, &values, false);
247            result.push(ViewRow {
248                id: None,
249                key: current_key,
250                value: reduced,
251                doc: None,
252            });
253
254            current_key = gk;
255            keys = vec![row.key.clone()];
256            values = vec![row.value.clone()];
257        }
258    }
259
260    // Emit last group
261    let reduced = apply_reduce(reduce, &keys, &values, false);
262    result.push(ViewRow {
263        id: None,
264        key: current_key,
265        value: reduced,
266        doc: None,
267    });
268
269    result
270}
271
272fn group_key(key: &serde_json::Value, group_level: Option<u64>) -> serde_json::Value {
273    match group_level {
274        None => key.clone(), // Full grouping
275        Some(level) => {
276            if let Some(arr) = key.as_array() {
277                let truncated: Vec<serde_json::Value> =
278                    arr.iter().take(level as usize).cloned().collect();
279                serde_json::Value::Array(truncated)
280            } else {
281                key.clone()
282            }
283        }
284    }
285}
286
287fn apply_reduce(
288    reduce: &ReduceFn,
289    keys: &[serde_json::Value],
290    values: &[serde_json::Value],
291    rereduce: bool,
292) -> serde_json::Value {
293    match reduce {
294        ReduceFn::Sum => {
295            let sum: f64 = values.iter().filter_map(|v| v.as_f64()).sum();
296            serde_json::json!(sum)
297        }
298        ReduceFn::Count => {
299            serde_json::json!(values.len())
300        }
301        ReduceFn::Stats => {
302            let nums: Vec<f64> = values.iter().filter_map(|v| v.as_f64()).collect();
303            let count = nums.len();
304            if count == 0 {
305                return serde_json::json!({"sum": 0, "count": 0, "min": 0, "max": 0, "sumsqr": 0});
306            }
307            let sum: f64 = nums.iter().sum();
308            let min = nums.iter().copied().fold(f64::INFINITY, f64::min);
309            let max = nums.iter().copied().fold(f64::NEG_INFINITY, f64::max);
310            let sumsqr: f64 = nums.iter().map(|n| n * n).sum();
311            serde_json::json!({
312                "sum": sum,
313                "count": count,
314                "min": min,
315                "max": max,
316                "sumsqr": sumsqr
317            })
318        }
319        ReduceFn::Custom(f) => f(keys, values, rereduce),
320    }
321}
322
323// ---------------------------------------------------------------------------
324// Tests
325// ---------------------------------------------------------------------------
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use rouchdb_adapter_memory::MemoryAdapter;
331    use rouchdb_core::document::{BulkDocsOptions, Document};
332    use std::collections::HashMap;
333
334    async fn setup_db() -> MemoryAdapter {
335        let db = MemoryAdapter::new("test");
336        let docs = vec![
337            Document {
338                id: "alice".into(),
339                rev: None,
340                deleted: false,
341                data: serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"}),
342                attachments: HashMap::new(),
343            },
344            Document {
345                id: "bob".into(),
346                rev: None,
347                deleted: false,
348                data: serde_json::json!({"name": "Bob", "age": 25, "city": "LA"}),
349                attachments: HashMap::new(),
350            },
351            Document {
352                id: "charlie".into(),
353                rev: None,
354                deleted: false,
355                data: serde_json::json!({"name": "Charlie", "age": 35, "city": "NYC"}),
356                attachments: HashMap::new(),
357            },
358        ];
359        db.bulk_docs(docs, BulkDocsOptions::new()).await.unwrap();
360        db
361    }
362
363    #[tokio::test]
364    async fn map_emits_all() {
365        let db = setup_db().await;
366
367        let result = query_view(
368            &db,
369            &|doc| {
370                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
371                vec![(name, serde_json::json!(1))]
372            },
373            None,
374            ViewQueryOptions::new(),
375        )
376        .await
377        .unwrap();
378
379        assert_eq!(result.total_rows, 3);
380        // Sorted by key (name): Alice, Bob, Charlie
381        assert_eq!(result.rows[0].key, "Alice");
382        assert_eq!(result.rows[1].key, "Bob");
383        assert_eq!(result.rows[2].key, "Charlie");
384    }
385
386    #[tokio::test]
387    async fn map_with_key_filter() {
388        let db = setup_db().await;
389
390        let result = query_view(
391            &db,
392            &|doc| {
393                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
394                vec![(name, serde_json::json!(1))]
395            },
396            None,
397            ViewQueryOptions {
398                key: Some(serde_json::json!("Bob")),
399                ..ViewQueryOptions::new()
400            },
401        )
402        .await
403        .unwrap();
404
405        assert_eq!(result.rows.len(), 1);
406        assert_eq!(result.rows[0].key, "Bob");
407    }
408
409    #[tokio::test]
410    async fn reduce_sum() {
411        let db = setup_db().await;
412
413        let result = query_view(
414            &db,
415            &|doc| {
416                let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
417                vec![(serde_json::Value::Null, age)]
418            },
419            Some(&ReduceFn::Sum),
420            ViewQueryOptions {
421                reduce: true,
422                ..ViewQueryOptions::new()
423            },
424        )
425        .await
426        .unwrap();
427
428        assert_eq!(result.rows.len(), 1);
429        assert_eq!(result.rows[0].value, serde_json::json!(90.0)); // 30 + 25 + 35
430    }
431
432    #[tokio::test]
433    async fn reduce_count() {
434        let db = setup_db().await;
435
436        let result = query_view(
437            &db,
438            &|doc| {
439                let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
440                vec![(city, serde_json::json!(1))]
441            },
442            Some(&ReduceFn::Count),
443            ViewQueryOptions {
444                reduce: true,
445                ..ViewQueryOptions::new()
446            },
447        )
448        .await
449        .unwrap();
450
451        assert_eq!(result.rows[0].value, serde_json::json!(3));
452    }
453
454    #[tokio::test]
455    async fn reduce_group() {
456        let db = setup_db().await;
457
458        let result = query_view(
459            &db,
460            &|doc| {
461                let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
462                vec![(city, serde_json::json!(1))]
463            },
464            Some(&ReduceFn::Count),
465            ViewQueryOptions {
466                reduce: true,
467                group: true,
468                ..ViewQueryOptions::new()
469            },
470        )
471        .await
472        .unwrap();
473
474        assert_eq!(result.rows.len(), 2); // LA, NYC
475        // LA: 1, NYC: 2
476        assert_eq!(result.rows[0].key, "LA");
477        assert_eq!(result.rows[0].value, serde_json::json!(1));
478        assert_eq!(result.rows[1].key, "NYC");
479        assert_eq!(result.rows[1].value, serde_json::json!(2));
480    }
481
482    #[tokio::test]
483    async fn reduce_stats() {
484        let db = setup_db().await;
485
486        let result = query_view(
487            &db,
488            &|doc| {
489                let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
490                vec![(serde_json::Value::Null, age)]
491            },
492            Some(&ReduceFn::Stats),
493            ViewQueryOptions {
494                reduce: true,
495                ..ViewQueryOptions::new()
496            },
497        )
498        .await
499        .unwrap();
500
501        let stats = &result.rows[0].value;
502        assert_eq!(stats["count"], 3);
503        assert_eq!(stats["sum"], 90.0);
504        assert_eq!(stats["min"], 25.0);
505        assert_eq!(stats["max"], 35.0);
506    }
507
508    #[tokio::test]
509    async fn descending_and_limit() {
510        let db = setup_db().await;
511
512        let result = query_view(
513            &db,
514            &|doc| {
515                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
516                vec![(name, serde_json::json!(1))]
517            },
518            None,
519            ViewQueryOptions {
520                descending: true,
521                limit: Some(2),
522                ..ViewQueryOptions::new()
523            },
524        )
525        .await
526        .unwrap();
527
528        assert_eq!(result.rows.len(), 2);
529        assert_eq!(result.rows[0].key, "Charlie");
530        assert_eq!(result.rows[1].key, "Bob");
531    }
532
533    #[tokio::test]
534    async fn start_end_key_range() {
535        let db = setup_db().await;
536
537        let result = query_view(
538            &db,
539            &|doc| {
540                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
541                vec![(name, serde_json::json!(1))]
542            },
543            None,
544            ViewQueryOptions {
545                start_key: Some(serde_json::json!("Bob")),
546                end_key: Some(serde_json::json!("Charlie")),
547                ..ViewQueryOptions::new()
548            },
549        )
550        .await
551        .unwrap();
552
553        assert_eq!(result.rows.len(), 2);
554        assert_eq!(result.rows[0].key, "Bob");
555        assert_eq!(result.rows[1].key, "Charlie");
556    }
557}