Skip to main content

rouchdb_query/
mapreduce.rs

1//! Map/reduce view engine.
2//!
3//! Users define views by providing map functions (Rust closures) and optional
4//! reduce functions. The engine runs the map over all documents and collects
5//! key-value pairs, then optionally reduces them.
6
7use std::cmp::Ordering;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::collation::collate;
11use rouchdb_core::document::AllDocsOptions;
12use rouchdb_core::error::Result;
13
14/// A key-value pair emitted by a map function.
15#[derive(Debug, Clone)]
16pub struct EmittedRow {
17    pub id: String,
18    pub key: serde_json::Value,
19    pub value: serde_json::Value,
20}
21
22/// Built-in reduce functions matching CouchDB's built-ins.
23pub enum ReduceFn {
24    /// Sum all numeric values.
25    Sum,
26    /// Count the number of rows.
27    Count,
28    /// Compute statistics (sum, count, min, max, sumsqr).
29    Stats,
30    /// Custom reduce function.
31    #[allow(clippy::type_complexity)]
32    Custom(Box<dyn Fn(&[serde_json::Value], &[serde_json::Value], bool) -> serde_json::Value>),
33}
34
35/// Options for querying a view.
36#[derive(Debug, Clone, Default)]
37pub struct ViewQueryOptions {
38    /// Only return rows with this exact key.
39    pub key: Option<serde_json::Value>,
40    /// Return rows matching any of these keys, in the given order.
41    pub keys: Option<Vec<serde_json::Value>>,
42    /// Start of key range (inclusive).
43    pub start_key: Option<serde_json::Value>,
44    /// End of key range (inclusive by default).
45    pub end_key: Option<serde_json::Value>,
46    /// Whether to include the end_key in the range.
47    pub inclusive_end: bool,
48    /// Reverse the order.
49    pub descending: bool,
50    /// Number of rows to skip.
51    pub skip: u64,
52    /// Maximum number of rows.
53    pub limit: Option<u64>,
54    /// Include the full document in each row.
55    pub include_docs: bool,
56    /// Whether to run the reduce function.
57    pub reduce: bool,
58    /// Group by key (requires reduce).
59    pub group: bool,
60    /// Group to this many array elements of the key.
61    pub group_level: Option<u64>,
62    /// Use stale index without rebuilding.
63    pub stale: StaleOption,
64}
65
66/// Controls whether the index is rebuilt before querying.
67#[derive(Debug, Clone, Default, PartialEq, Eq)]
68pub enum StaleOption {
69    /// Always rebuild the index before querying (default).
70    #[default]
71    False,
72    /// Use the index as-is, do not rebuild.
73    Ok,
74    /// Use the index as-is, then rebuild in the background.
75    UpdateAfter,
76}
77
78impl ViewQueryOptions {
79    pub fn new() -> Self {
80        Self {
81            inclusive_end: true,
82            ..Default::default()
83        }
84    }
85}
86
87/// Result of querying a view.
88#[derive(Debug, Clone)]
89pub struct ViewResult {
90    pub total_rows: u64,
91    pub offset: u64,
92    pub rows: Vec<ViewRow>,
93}
94
95/// A single row in a view result.
96#[derive(Debug, Clone)]
97pub struct ViewRow {
98    pub id: Option<String>,
99    pub key: serde_json::Value,
100    pub value: serde_json::Value,
101    pub doc: Option<serde_json::Value>,
102}
103
104/// Run a temporary (ad-hoc) map/reduce query.
105///
106/// The `map_fn` receives a document JSON and returns emitted key-value pairs.
107pub async fn query_view(
108    adapter: &dyn Adapter,
109    map_fn: &dyn Fn(&serde_json::Value) -> Vec<(serde_json::Value, serde_json::Value)>,
110    reduce_fn: Option<&ReduceFn>,
111    opts: ViewQueryOptions,
112) -> Result<ViewResult> {
113    // Run map over all documents
114    let all = adapter
115        .all_docs(AllDocsOptions {
116            include_docs: true,
117            ..AllDocsOptions::new()
118        })
119        .await?;
120
121    let mut emitted: Vec<EmittedRow> = Vec::new();
122
123    for row in &all.rows {
124        if let Some(ref doc_json) = row.doc {
125            let pairs = map_fn(doc_json);
126            for (key, value) in pairs {
127                emitted.push(EmittedRow {
128                    id: row.id.clone(),
129                    key,
130                    value,
131                });
132            }
133        }
134    }
135
136    // Sort by key using CouchDB collation
137    emitted.sort_by(|a, b| {
138        let cmp = collate(&a.key, &b.key);
139        if cmp == Ordering::Equal {
140            a.id.cmp(&b.id)
141        } else {
142            cmp
143        }
144    });
145
146    if opts.descending {
147        emitted.reverse();
148    }
149
150    // Filter by keys (multi-key lookup) or by key range
151    let emitted = if let Some(ref keys) = opts.keys {
152        let mut ordered_rows = Vec::new();
153        for search_key in keys {
154            for row in &emitted {
155                if collate(&row.key, search_key) == Ordering::Equal {
156                    ordered_rows.push(row.clone());
157                }
158            }
159        }
160        ordered_rows
161    } else {
162        filter_by_range(emitted, &opts)
163    };
164
165    let total_rows = emitted.len() as u64;
166
167    // Reduce
168    if opts.reduce
169        && let Some(reduce) = reduce_fn
170    {
171        // group_level == Some(0) means a single global group (no grouping),
172        // matching CouchDB where group_level overrides group.
173        let grouped = opts.group_level.map(|l| l > 0).unwrap_or(opts.group);
174        let rows = if grouped {
175            group_reduce(&emitted, reduce, opts.group_level)
176        } else if emitted.is_empty() {
177            // An empty reduce yields no rows (CouchDB returns {"rows":[]}),
178            // not a spurious zero row.
179            Vec::new()
180        } else {
181            let keys: Vec<serde_json::Value> = emitted.iter().map(|r| r.key.clone()).collect();
182            let values: Vec<serde_json::Value> = emitted.iter().map(|r| r.value.clone()).collect();
183            let result = apply_reduce(reduce, &keys, &values, false);
184            vec![ViewRow {
185                id: None,
186                key: serde_json::Value::Null,
187                value: result,
188                doc: None,
189            }]
190        };
191
192        // Apply skip/limit to the reduced/grouped rows as well.
193        let reduced_total = rows.len() as u64;
194        let skip = opts.skip as usize;
195        let rows: Vec<ViewRow> = rows
196            .into_iter()
197            .skip(skip)
198            .take(opts.limit.unwrap_or(u64::MAX) as usize)
199            .collect();
200
201        return Ok(ViewResult {
202            total_rows: reduced_total,
203            offset: opts.skip,
204            rows,
205        });
206    }
207
208    // Apply skip and limit
209    let skip = opts.skip as usize;
210    let rows: Vec<ViewRow> = emitted
211        .into_iter()
212        .skip(skip)
213        .take(opts.limit.unwrap_or(u64::MAX) as usize)
214        .map(|r| ViewRow {
215            id: Some(r.id),
216            key: r.key,
217            value: r.value,
218            doc: None,
219        })
220        .collect();
221
222    Ok(ViewResult {
223        total_rows,
224        offset: opts.skip,
225        rows,
226    })
227}
228
229fn filter_by_range(rows: Vec<EmittedRow>, opts: &ViewQueryOptions) -> Vec<EmittedRow> {
230    rows.into_iter()
231        .filter(|r| {
232            if let Some(ref key) = opts.key {
233                return collate(&r.key, key) == Ordering::Equal;
234            }
235
236            if let Some(ref start) = opts.start_key {
237                if opts.descending {
238                    if collate(&r.key, start) == Ordering::Greater {
239                        return false;
240                    }
241                } else if collate(&r.key, start) == Ordering::Less {
242                    return false;
243                }
244            }
245
246            if let Some(ref end) = opts.end_key {
247                if opts.descending {
248                    let cmp = collate(&r.key, end);
249                    if opts.inclusive_end {
250                        if cmp == Ordering::Less {
251                            return false;
252                        }
253                    } else if cmp != Ordering::Greater {
254                        return false;
255                    }
256                } else {
257                    let cmp = collate(&r.key, end);
258                    if opts.inclusive_end {
259                        if cmp == Ordering::Greater {
260                            return false;
261                        }
262                    } else if cmp != Ordering::Less {
263                        return false;
264                    }
265                }
266            }
267
268            true
269        })
270        .collect()
271}
272
273fn group_reduce(rows: &[EmittedRow], reduce: &ReduceFn, group_level: Option<u64>) -> Vec<ViewRow> {
274    if rows.is_empty() {
275        return vec![];
276    }
277
278    let mut result = Vec::new();
279    let mut current_key = group_key(&rows[0].key, group_level);
280    let mut keys = vec![rows[0].key.clone()];
281    let mut values = vec![rows[0].value.clone()];
282
283    for row in &rows[1..] {
284        let gk = group_key(&row.key, group_level);
285        if collate(&gk, &current_key) == Ordering::Equal {
286            keys.push(row.key.clone());
287            values.push(row.value.clone());
288        } else {
289            // Emit group
290            let reduced = apply_reduce(reduce, &keys, &values, false);
291            result.push(ViewRow {
292                id: None,
293                key: current_key,
294                value: reduced,
295                doc: None,
296            });
297
298            current_key = gk;
299            keys = vec![row.key.clone()];
300            values = vec![row.value.clone()];
301        }
302    }
303
304    // Emit last group
305    let reduced = apply_reduce(reduce, &keys, &values, false);
306    result.push(ViewRow {
307        id: None,
308        key: current_key,
309        value: reduced,
310        doc: None,
311    });
312
313    result
314}
315
316fn group_key(key: &serde_json::Value, group_level: Option<u64>) -> serde_json::Value {
317    match group_level {
318        None => key.clone(), // Full grouping
319        Some(level) => {
320            if let Some(arr) = key.as_array() {
321                let truncated: Vec<serde_json::Value> =
322                    arr.iter().take(level as usize).cloned().collect();
323                serde_json::Value::Array(truncated)
324            } else {
325                key.clone()
326            }
327        }
328    }
329}
330
331fn apply_reduce(
332    reduce: &ReduceFn,
333    keys: &[serde_json::Value],
334    values: &[serde_json::Value],
335    rereduce: bool,
336) -> serde_json::Value {
337    match reduce {
338        ReduceFn::Sum => {
339            let sum: f64 = values.iter().filter_map(|v| v.as_f64()).sum();
340            serde_json::json!(sum)
341        }
342        ReduceFn::Count => {
343            serde_json::json!(values.len())
344        }
345        ReduceFn::Stats => {
346            let nums: Vec<f64> = values.iter().filter_map(|v| v.as_f64()).collect();
347            let count = nums.len();
348            if count == 0 {
349                return serde_json::json!({"sum": 0, "count": 0, "min": 0, "max": 0, "sumsqr": 0});
350            }
351            let sum: f64 = nums.iter().sum();
352            let min = nums.iter().copied().fold(f64::INFINITY, f64::min);
353            let max = nums.iter().copied().fold(f64::NEG_INFINITY, f64::max);
354            let sumsqr: f64 = nums.iter().map(|n| n * n).sum();
355            serde_json::json!({
356                "sum": sum,
357                "count": count,
358                "min": min,
359                "max": max,
360                "sumsqr": sumsqr
361            })
362        }
363        ReduceFn::Custom(f) => f(keys, values, rereduce),
364    }
365}
366
367// ---------------------------------------------------------------------------
368// Tests
369// ---------------------------------------------------------------------------
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use rouchdb_adapter_memory::MemoryAdapter;
375    use rouchdb_core::document::{BulkDocsOptions, Document};
376    use std::collections::HashMap;
377
378    async fn setup_db() -> MemoryAdapter {
379        let db = MemoryAdapter::new("test");
380        let docs = vec![
381            Document {
382                id: "alice".into(),
383                rev: None,
384                deleted: false,
385                data: serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"}),
386                attachments: HashMap::new(),
387            },
388            Document {
389                id: "bob".into(),
390                rev: None,
391                deleted: false,
392                data: serde_json::json!({"name": "Bob", "age": 25, "city": "LA"}),
393                attachments: HashMap::new(),
394            },
395            Document {
396                id: "charlie".into(),
397                rev: None,
398                deleted: false,
399                data: serde_json::json!({"name": "Charlie", "age": 35, "city": "NYC"}),
400                attachments: HashMap::new(),
401            },
402        ];
403        db.bulk_docs(docs, BulkDocsOptions::new()).await.unwrap();
404        db
405    }
406
407    #[tokio::test]
408    async fn map_emits_all() {
409        let db = setup_db().await;
410
411        let result = query_view(
412            &db,
413            &|doc| {
414                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
415                vec![(name, serde_json::json!(1))]
416            },
417            None,
418            ViewQueryOptions::new(),
419        )
420        .await
421        .unwrap();
422
423        assert_eq!(result.total_rows, 3);
424        // Sorted by key (name): Alice, Bob, Charlie
425        assert_eq!(result.rows[0].key, "Alice");
426        assert_eq!(result.rows[1].key, "Bob");
427        assert_eq!(result.rows[2].key, "Charlie");
428    }
429
430    #[tokio::test]
431    async fn reduce_group_level_zero_is_global() {
432        let db = setup_db().await;
433        let result = query_view(
434            &db,
435            &|doc| {
436                let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
437                vec![(city, serde_json::json!(1))]
438            },
439            Some(&ReduceFn::Count),
440            ViewQueryOptions {
441                reduce: true,
442                group_level: Some(0),
443                ..ViewQueryOptions::new()
444            },
445        )
446        .await
447        .unwrap();
448        // group_level=0 collapses everything into one global group.
449        assert_eq!(result.rows.len(), 1);
450        assert_eq!(result.rows[0].value, serde_json::json!(3));
451    }
452
453    #[tokio::test]
454    async fn reduce_grouped_honors_skip_and_limit() {
455        let db = setup_db().await;
456        let result = query_view(
457            &db,
458            &|doc| {
459                let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
460                vec![(city, serde_json::json!(1))]
461            },
462            Some(&ReduceFn::Count),
463            ViewQueryOptions {
464                reduce: true,
465                group: true,
466                skip: 1,
467                limit: Some(1),
468                ..ViewQueryOptions::new()
469            },
470        )
471        .await
472        .unwrap();
473        // Groups sorted by key: "LA"(1), "NYC"(2). skip 1 -> NYC; limit 1.
474        assert_eq!(result.total_rows, 2);
475        assert_eq!(result.rows.len(), 1);
476        assert_eq!(result.rows[0].key, "NYC");
477        assert_eq!(result.rows[0].value, serde_json::json!(2));
478    }
479
480    #[tokio::test]
481    async fn map_with_key_filter() {
482        let db = setup_db().await;
483
484        let result = query_view(
485            &db,
486            &|doc| {
487                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
488                vec![(name, serde_json::json!(1))]
489            },
490            None,
491            ViewQueryOptions {
492                key: Some(serde_json::json!("Bob")),
493                ..ViewQueryOptions::new()
494            },
495        )
496        .await
497        .unwrap();
498
499        assert_eq!(result.rows.len(), 1);
500        assert_eq!(result.rows[0].key, "Bob");
501    }
502
503    #[tokio::test]
504    async fn reduce_sum() {
505        let db = setup_db().await;
506
507        let result = query_view(
508            &db,
509            &|doc| {
510                let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
511                vec![(serde_json::Value::Null, age)]
512            },
513            Some(&ReduceFn::Sum),
514            ViewQueryOptions {
515                reduce: true,
516                ..ViewQueryOptions::new()
517            },
518        )
519        .await
520        .unwrap();
521
522        assert_eq!(result.rows.len(), 1);
523        assert_eq!(result.rows[0].value, serde_json::json!(90.0)); // 30 + 25 + 35
524    }
525
526    #[tokio::test]
527    async fn reduce_count() {
528        let db = setup_db().await;
529
530        let result = query_view(
531            &db,
532            &|doc| {
533                let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
534                vec![(city, serde_json::json!(1))]
535            },
536            Some(&ReduceFn::Count),
537            ViewQueryOptions {
538                reduce: true,
539                ..ViewQueryOptions::new()
540            },
541        )
542        .await
543        .unwrap();
544
545        assert_eq!(result.rows[0].value, serde_json::json!(3));
546    }
547
548    #[tokio::test]
549    async fn reduce_group() {
550        let db = setup_db().await;
551
552        let result = query_view(
553            &db,
554            &|doc| {
555                let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
556                vec![(city, serde_json::json!(1))]
557            },
558            Some(&ReduceFn::Count),
559            ViewQueryOptions {
560                reduce: true,
561                group: true,
562                ..ViewQueryOptions::new()
563            },
564        )
565        .await
566        .unwrap();
567
568        assert_eq!(result.rows.len(), 2); // LA, NYC
569        // LA: 1, NYC: 2
570        assert_eq!(result.rows[0].key, "LA");
571        assert_eq!(result.rows[0].value, serde_json::json!(1));
572        assert_eq!(result.rows[1].key, "NYC");
573        assert_eq!(result.rows[1].value, serde_json::json!(2));
574    }
575
576    #[tokio::test]
577    async fn reduce_stats() {
578        let db = setup_db().await;
579
580        let result = query_view(
581            &db,
582            &|doc| {
583                let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
584                vec![(serde_json::Value::Null, age)]
585            },
586            Some(&ReduceFn::Stats),
587            ViewQueryOptions {
588                reduce: true,
589                ..ViewQueryOptions::new()
590            },
591        )
592        .await
593        .unwrap();
594
595        let stats = &result.rows[0].value;
596        assert_eq!(stats["count"], 3);
597        assert_eq!(stats["sum"], 90.0);
598        assert_eq!(stats["min"], 25.0);
599        assert_eq!(stats["max"], 35.0);
600    }
601
602    #[tokio::test]
603    async fn descending_and_limit() {
604        let db = setup_db().await;
605
606        let result = query_view(
607            &db,
608            &|doc| {
609                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
610                vec![(name, serde_json::json!(1))]
611            },
612            None,
613            ViewQueryOptions {
614                descending: true,
615                limit: Some(2),
616                ..ViewQueryOptions::new()
617            },
618        )
619        .await
620        .unwrap();
621
622        assert_eq!(result.rows.len(), 2);
623        assert_eq!(result.rows[0].key, "Charlie");
624        assert_eq!(result.rows[1].key, "Bob");
625    }
626
627    #[tokio::test]
628    async fn start_end_key_range() {
629        let db = setup_db().await;
630
631        let result = query_view(
632            &db,
633            &|doc| {
634                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
635                vec![(name, serde_json::json!(1))]
636            },
637            None,
638            ViewQueryOptions {
639                start_key: Some(serde_json::json!("Bob")),
640                end_key: Some(serde_json::json!("Charlie")),
641                ..ViewQueryOptions::new()
642            },
643        )
644        .await
645        .unwrap();
646
647        assert_eq!(result.rows.len(), 2);
648        assert_eq!(result.rows[0].key, "Bob");
649        assert_eq!(result.rows[1].key, "Charlie");
650    }
651}