Skip to main content

rouchdb_query/
mapreduce.rs

1//! Map/reduce view engine.
2//!
3//! Users define views by providing map functions (Rust closures) and optional
4//! reduce functions. The engine runs the map over all documents and collects
5//! key-value pairs, then optionally reduces them.
6
7use std::cmp::Ordering;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::collation::collate;
11use rouchdb_core::document::AllDocsOptions;
12use rouchdb_core::error::Result;
13
14/// A key-value pair emitted by a map function.
15#[derive(Debug, Clone)]
16pub struct EmittedRow {
17    pub id: String,
18    pub key: serde_json::Value,
19    pub value: serde_json::Value,
20}
21
22/// Built-in reduce functions matching CouchDB's built-ins.
23pub enum ReduceFn {
24    /// Sum all numeric values.
25    Sum,
26    /// Count the number of rows.
27    Count,
28    /// Compute statistics (sum, count, min, max, sumsqr).
29    Stats,
30    /// Custom reduce function.
31    #[allow(clippy::type_complexity)]
32    Custom(Box<dyn Fn(&[serde_json::Value], &[serde_json::Value], bool) -> serde_json::Value>),
33}
34
35/// Options for querying a view.
36#[derive(Debug, Clone, Default)]
37pub struct ViewQueryOptions {
38    /// Only return rows with this exact key.
39    pub key: Option<serde_json::Value>,
40    /// Return rows matching any of these keys, in the given order.
41    pub keys: Option<Vec<serde_json::Value>>,
42    /// Start of key range (inclusive).
43    pub start_key: Option<serde_json::Value>,
44    /// End of key range (inclusive by default).
45    pub end_key: Option<serde_json::Value>,
46    /// Whether to include the end_key in the range.
47    pub inclusive_end: bool,
48    /// Reverse the order.
49    pub descending: bool,
50    /// Number of rows to skip.
51    pub skip: u64,
52    /// Maximum number of rows.
53    pub limit: Option<u64>,
54    /// Include the full document in each row.
55    pub include_docs: bool,
56    /// Whether to run the reduce function.
57    pub reduce: bool,
58    /// Group by key (requires reduce).
59    pub group: bool,
60    /// Group to this many array elements of the key.
61    pub group_level: Option<u64>,
62    /// Use stale index without rebuilding.
63    pub stale: StaleOption,
64}
65
66/// Controls whether the index is rebuilt before querying.
67#[derive(Debug, Clone, Default, PartialEq, Eq)]
68pub enum StaleOption {
69    /// Always rebuild the index before querying (default).
70    #[default]
71    False,
72    /// Use the index as-is, do not rebuild.
73    Ok,
74    /// Use the index as-is, then rebuild in the background.
75    UpdateAfter,
76}
77
78impl ViewQueryOptions {
79    pub fn new() -> Self {
80        Self {
81            inclusive_end: true,
82            ..Default::default()
83        }
84    }
85}
86
87/// Result of querying a view.
88#[derive(Debug, Clone)]
89pub struct ViewResult {
90    pub total_rows: u64,
91    pub offset: u64,
92    pub rows: Vec<ViewRow>,
93}
94
95/// A single row in a view result.
96#[derive(Debug, Clone)]
97pub struct ViewRow {
98    pub id: Option<String>,
99    pub key: serde_json::Value,
100    pub value: serde_json::Value,
101    pub doc: Option<serde_json::Value>,
102}
103
104/// Run a temporary (ad-hoc) map/reduce query.
105///
106/// The `map_fn` receives a document JSON and returns emitted key-value pairs.
107pub async fn query_view(
108    adapter: &dyn Adapter,
109    map_fn: &dyn Fn(&serde_json::Value) -> Vec<(serde_json::Value, serde_json::Value)>,
110    reduce_fn: Option<&ReduceFn>,
111    opts: ViewQueryOptions,
112) -> Result<ViewResult> {
113    // Run map over all documents
114    let all = adapter
115        .all_docs(AllDocsOptions {
116            include_docs: true,
117            ..AllDocsOptions::new()
118        })
119        .await?;
120
121    let mut emitted: Vec<EmittedRow> = Vec::new();
122
123    for row in &all.rows {
124        if let Some(ref doc_json) = row.doc {
125            let pairs = map_fn(doc_json);
126            for (key, value) in pairs {
127                emitted.push(EmittedRow {
128                    id: row.id.clone(),
129                    key,
130                    value,
131                });
132            }
133        }
134    }
135
136    // Sort by key using CouchDB collation
137    emitted.sort_by(|a, b| {
138        let cmp = collate(&a.key, &b.key);
139        if cmp == Ordering::Equal {
140            a.id.cmp(&b.id)
141        } else {
142            cmp
143        }
144    });
145
146    if opts.descending {
147        emitted.reverse();
148    }
149
150    // Filter by keys (multi-key lookup) or by key range
151    let emitted = if let Some(ref keys) = opts.keys {
152        let mut ordered_rows = Vec::new();
153        for search_key in keys {
154            for row in &emitted {
155                if collate(&row.key, search_key) == Ordering::Equal {
156                    ordered_rows.push(row.clone());
157                }
158            }
159        }
160        ordered_rows
161    } else {
162        filter_by_range(emitted, &opts)
163    };
164
165    let total_rows = emitted.len() as u64;
166
167    // Reduce
168    if opts.reduce
169        && let Some(reduce) = reduce_fn
170    {
171        let rows = if opts.group || opts.group_level.is_some() {
172            group_reduce(&emitted, reduce, opts.group_level)
173        } else {
174            let keys: Vec<serde_json::Value> = emitted.iter().map(|r| r.key.clone()).collect();
175            let values: Vec<serde_json::Value> = emitted.iter().map(|r| r.value.clone()).collect();
176            let result = apply_reduce(reduce, &keys, &values, false);
177            vec![ViewRow {
178                id: None,
179                key: serde_json::Value::Null,
180                value: result,
181                doc: None,
182            }]
183        };
184
185        return Ok(ViewResult {
186            total_rows: rows.len() as u64,
187            offset: 0,
188            rows,
189        });
190    }
191
192    // Apply skip and limit
193    let skip = opts.skip as usize;
194    let rows: Vec<ViewRow> = emitted
195        .into_iter()
196        .skip(skip)
197        .take(opts.limit.unwrap_or(u64::MAX) as usize)
198        .map(|r| ViewRow {
199            id: Some(r.id),
200            key: r.key,
201            value: r.value,
202            doc: None,
203        })
204        .collect();
205
206    Ok(ViewResult {
207        total_rows,
208        offset: opts.skip,
209        rows,
210    })
211}
212
213fn filter_by_range(rows: Vec<EmittedRow>, opts: &ViewQueryOptions) -> Vec<EmittedRow> {
214    rows.into_iter()
215        .filter(|r| {
216            if let Some(ref key) = opts.key {
217                return collate(&r.key, key) == Ordering::Equal;
218            }
219
220            if let Some(ref start) = opts.start_key {
221                if opts.descending {
222                    if collate(&r.key, start) == Ordering::Greater {
223                        return false;
224                    }
225                } else if collate(&r.key, start) == Ordering::Less {
226                    return false;
227                }
228            }
229
230            if let Some(ref end) = opts.end_key {
231                if opts.descending {
232                    let cmp = collate(&r.key, end);
233                    if opts.inclusive_end {
234                        if cmp == Ordering::Less {
235                            return false;
236                        }
237                    } else if cmp != Ordering::Greater {
238                        return false;
239                    }
240                } else {
241                    let cmp = collate(&r.key, end);
242                    if opts.inclusive_end {
243                        if cmp == Ordering::Greater {
244                            return false;
245                        }
246                    } else if cmp != Ordering::Less {
247                        return false;
248                    }
249                }
250            }
251
252            true
253        })
254        .collect()
255}
256
257fn group_reduce(rows: &[EmittedRow], reduce: &ReduceFn, group_level: Option<u64>) -> Vec<ViewRow> {
258    if rows.is_empty() {
259        return vec![];
260    }
261
262    let mut result = Vec::new();
263    let mut current_key = group_key(&rows[0].key, group_level);
264    let mut keys = vec![rows[0].key.clone()];
265    let mut values = vec![rows[0].value.clone()];
266
267    for row in &rows[1..] {
268        let gk = group_key(&row.key, group_level);
269        if collate(&gk, &current_key) == Ordering::Equal {
270            keys.push(row.key.clone());
271            values.push(row.value.clone());
272        } else {
273            // Emit group
274            let reduced = apply_reduce(reduce, &keys, &values, false);
275            result.push(ViewRow {
276                id: None,
277                key: current_key,
278                value: reduced,
279                doc: None,
280            });
281
282            current_key = gk;
283            keys = vec![row.key.clone()];
284            values = vec![row.value.clone()];
285        }
286    }
287
288    // Emit last group
289    let reduced = apply_reduce(reduce, &keys, &values, false);
290    result.push(ViewRow {
291        id: None,
292        key: current_key,
293        value: reduced,
294        doc: None,
295    });
296
297    result
298}
299
300fn group_key(key: &serde_json::Value, group_level: Option<u64>) -> serde_json::Value {
301    match group_level {
302        None => key.clone(), // Full grouping
303        Some(level) => {
304            if let Some(arr) = key.as_array() {
305                let truncated: Vec<serde_json::Value> =
306                    arr.iter().take(level as usize).cloned().collect();
307                serde_json::Value::Array(truncated)
308            } else {
309                key.clone()
310            }
311        }
312    }
313}
314
315fn apply_reduce(
316    reduce: &ReduceFn,
317    keys: &[serde_json::Value],
318    values: &[serde_json::Value],
319    rereduce: bool,
320) -> serde_json::Value {
321    match reduce {
322        ReduceFn::Sum => {
323            let sum: f64 = values.iter().filter_map(|v| v.as_f64()).sum();
324            serde_json::json!(sum)
325        }
326        ReduceFn::Count => {
327            serde_json::json!(values.len())
328        }
329        ReduceFn::Stats => {
330            let nums: Vec<f64> = values.iter().filter_map(|v| v.as_f64()).collect();
331            let count = nums.len();
332            if count == 0 {
333                return serde_json::json!({"sum": 0, "count": 0, "min": 0, "max": 0, "sumsqr": 0});
334            }
335            let sum: f64 = nums.iter().sum();
336            let min = nums.iter().copied().fold(f64::INFINITY, f64::min);
337            let max = nums.iter().copied().fold(f64::NEG_INFINITY, f64::max);
338            let sumsqr: f64 = nums.iter().map(|n| n * n).sum();
339            serde_json::json!({
340                "sum": sum,
341                "count": count,
342                "min": min,
343                "max": max,
344                "sumsqr": sumsqr
345            })
346        }
347        ReduceFn::Custom(f) => f(keys, values, rereduce),
348    }
349}
350
351// ---------------------------------------------------------------------------
352// Tests
353// ---------------------------------------------------------------------------
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use rouchdb_adapter_memory::MemoryAdapter;
359    use rouchdb_core::document::{BulkDocsOptions, Document};
360    use std::collections::HashMap;
361
362    async fn setup_db() -> MemoryAdapter {
363        let db = MemoryAdapter::new("test");
364        let docs = vec![
365            Document {
366                id: "alice".into(),
367                rev: None,
368                deleted: false,
369                data: serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"}),
370                attachments: HashMap::new(),
371            },
372            Document {
373                id: "bob".into(),
374                rev: None,
375                deleted: false,
376                data: serde_json::json!({"name": "Bob", "age": 25, "city": "LA"}),
377                attachments: HashMap::new(),
378            },
379            Document {
380                id: "charlie".into(),
381                rev: None,
382                deleted: false,
383                data: serde_json::json!({"name": "Charlie", "age": 35, "city": "NYC"}),
384                attachments: HashMap::new(),
385            },
386        ];
387        db.bulk_docs(docs, BulkDocsOptions::new()).await.unwrap();
388        db
389    }
390
391    #[tokio::test]
392    async fn map_emits_all() {
393        let db = setup_db().await;
394
395        let result = query_view(
396            &db,
397            &|doc| {
398                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
399                vec![(name, serde_json::json!(1))]
400            },
401            None,
402            ViewQueryOptions::new(),
403        )
404        .await
405        .unwrap();
406
407        assert_eq!(result.total_rows, 3);
408        // Sorted by key (name): Alice, Bob, Charlie
409        assert_eq!(result.rows[0].key, "Alice");
410        assert_eq!(result.rows[1].key, "Bob");
411        assert_eq!(result.rows[2].key, "Charlie");
412    }
413
414    #[tokio::test]
415    async fn map_with_key_filter() {
416        let db = setup_db().await;
417
418        let result = query_view(
419            &db,
420            &|doc| {
421                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
422                vec![(name, serde_json::json!(1))]
423            },
424            None,
425            ViewQueryOptions {
426                key: Some(serde_json::json!("Bob")),
427                ..ViewQueryOptions::new()
428            },
429        )
430        .await
431        .unwrap();
432
433        assert_eq!(result.rows.len(), 1);
434        assert_eq!(result.rows[0].key, "Bob");
435    }
436
437    #[tokio::test]
438    async fn reduce_sum() {
439        let db = setup_db().await;
440
441        let result = query_view(
442            &db,
443            &|doc| {
444                let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
445                vec![(serde_json::Value::Null, age)]
446            },
447            Some(&ReduceFn::Sum),
448            ViewQueryOptions {
449                reduce: true,
450                ..ViewQueryOptions::new()
451            },
452        )
453        .await
454        .unwrap();
455
456        assert_eq!(result.rows.len(), 1);
457        assert_eq!(result.rows[0].value, serde_json::json!(90.0)); // 30 + 25 + 35
458    }
459
460    #[tokio::test]
461    async fn reduce_count() {
462        let db = setup_db().await;
463
464        let result = query_view(
465            &db,
466            &|doc| {
467                let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
468                vec![(city, serde_json::json!(1))]
469            },
470            Some(&ReduceFn::Count),
471            ViewQueryOptions {
472                reduce: true,
473                ..ViewQueryOptions::new()
474            },
475        )
476        .await
477        .unwrap();
478
479        assert_eq!(result.rows[0].value, serde_json::json!(3));
480    }
481
482    #[tokio::test]
483    async fn reduce_group() {
484        let db = setup_db().await;
485
486        let result = query_view(
487            &db,
488            &|doc| {
489                let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
490                vec![(city, serde_json::json!(1))]
491            },
492            Some(&ReduceFn::Count),
493            ViewQueryOptions {
494                reduce: true,
495                group: true,
496                ..ViewQueryOptions::new()
497            },
498        )
499        .await
500        .unwrap();
501
502        assert_eq!(result.rows.len(), 2); // LA, NYC
503        // LA: 1, NYC: 2
504        assert_eq!(result.rows[0].key, "LA");
505        assert_eq!(result.rows[0].value, serde_json::json!(1));
506        assert_eq!(result.rows[1].key, "NYC");
507        assert_eq!(result.rows[1].value, serde_json::json!(2));
508    }
509
510    #[tokio::test]
511    async fn reduce_stats() {
512        let db = setup_db().await;
513
514        let result = query_view(
515            &db,
516            &|doc| {
517                let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
518                vec![(serde_json::Value::Null, age)]
519            },
520            Some(&ReduceFn::Stats),
521            ViewQueryOptions {
522                reduce: true,
523                ..ViewQueryOptions::new()
524            },
525        )
526        .await
527        .unwrap();
528
529        let stats = &result.rows[0].value;
530        assert_eq!(stats["count"], 3);
531        assert_eq!(stats["sum"], 90.0);
532        assert_eq!(stats["min"], 25.0);
533        assert_eq!(stats["max"], 35.0);
534    }
535
536    #[tokio::test]
537    async fn descending_and_limit() {
538        let db = setup_db().await;
539
540        let result = query_view(
541            &db,
542            &|doc| {
543                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
544                vec![(name, serde_json::json!(1))]
545            },
546            None,
547            ViewQueryOptions {
548                descending: true,
549                limit: Some(2),
550                ..ViewQueryOptions::new()
551            },
552        )
553        .await
554        .unwrap();
555
556        assert_eq!(result.rows.len(), 2);
557        assert_eq!(result.rows[0].key, "Charlie");
558        assert_eq!(result.rows[1].key, "Bob");
559    }
560
561    #[tokio::test]
562    async fn start_end_key_range() {
563        let db = setup_db().await;
564
565        let result = query_view(
566            &db,
567            &|doc| {
568                let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
569                vec![(name, serde_json::json!(1))]
570            },
571            None,
572            ViewQueryOptions {
573                start_key: Some(serde_json::json!("Bob")),
574                end_key: Some(serde_json::json!("Charlie")),
575                ..ViewQueryOptions::new()
576            },
577        )
578        .await
579        .unwrap();
580
581        assert_eq!(result.rows.len(), 2);
582        assert_eq!(result.rows[0].key, "Bob");
583        assert_eq!(result.rows[1].key, "Charlie");
584    }
585}