1use std::cmp::Ordering;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::collation::collate;
11use rouchdb_core::document::AllDocsOptions;
12use rouchdb_core::error::Result;
13
14#[derive(Debug, Clone)]
16pub struct EmittedRow {
17 pub id: String,
18 pub key: serde_json::Value,
19 pub value: serde_json::Value,
20}
21
22pub enum ReduceFn {
24 Sum,
26 Count,
28 Stats,
30 #[allow(clippy::type_complexity)]
32 Custom(Box<dyn Fn(&[serde_json::Value], &[serde_json::Value], bool) -> serde_json::Value>),
33}
34
35#[derive(Debug, Clone, Default)]
37pub struct ViewQueryOptions {
38 pub key: Option<serde_json::Value>,
40 pub keys: Option<Vec<serde_json::Value>>,
42 pub start_key: Option<serde_json::Value>,
44 pub end_key: Option<serde_json::Value>,
46 pub inclusive_end: bool,
48 pub descending: bool,
50 pub skip: u64,
52 pub limit: Option<u64>,
54 pub include_docs: bool,
56 pub reduce: bool,
58 pub group: bool,
60 pub group_level: Option<u64>,
62 pub stale: StaleOption,
64}
65
66#[derive(Debug, Clone, Default, PartialEq, Eq)]
68pub enum StaleOption {
69 #[default]
71 False,
72 Ok,
74 UpdateAfter,
76}
77
78impl ViewQueryOptions {
79 pub fn new() -> Self {
80 Self {
81 inclusive_end: true,
82 ..Default::default()
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct ViewResult {
90 pub total_rows: u64,
91 pub offset: u64,
92 pub rows: Vec<ViewRow>,
93}
94
95#[derive(Debug, Clone)]
97pub struct ViewRow {
98 pub id: Option<String>,
99 pub key: serde_json::Value,
100 pub value: serde_json::Value,
101 pub doc: Option<serde_json::Value>,
102}
103
104pub async fn query_view(
108 adapter: &dyn Adapter,
109 map_fn: &dyn Fn(&serde_json::Value) -> Vec<(serde_json::Value, serde_json::Value)>,
110 reduce_fn: Option<&ReduceFn>,
111 opts: ViewQueryOptions,
112) -> Result<ViewResult> {
113 let all = adapter
115 .all_docs(AllDocsOptions {
116 include_docs: true,
117 ..AllDocsOptions::new()
118 })
119 .await?;
120
121 let mut emitted: Vec<EmittedRow> = Vec::new();
122
123 for row in &all.rows {
124 if let Some(ref doc_json) = row.doc {
125 let pairs = map_fn(doc_json);
126 for (key, value) in pairs {
127 emitted.push(EmittedRow {
128 id: row.id.clone(),
129 key,
130 value,
131 });
132 }
133 }
134 }
135
136 emitted.sort_by(|a, b| {
138 let cmp = collate(&a.key, &b.key);
139 if cmp == Ordering::Equal {
140 a.id.cmp(&b.id)
141 } else {
142 cmp
143 }
144 });
145
146 if opts.descending {
147 emitted.reverse();
148 }
149
150 let emitted = if let Some(ref keys) = opts.keys {
152 let mut ordered_rows = Vec::new();
153 for search_key in keys {
154 for row in &emitted {
155 if collate(&row.key, search_key) == Ordering::Equal {
156 ordered_rows.push(row.clone());
157 }
158 }
159 }
160 ordered_rows
161 } else {
162 filter_by_range(emitted, &opts)
163 };
164
165 let total_rows = emitted.len() as u64;
166
167 if opts.reduce
169 && let Some(reduce) = reduce_fn
170 {
171 let grouped = opts.group_level.map(|l| l > 0).unwrap_or(opts.group);
174 let rows = if grouped {
175 group_reduce(&emitted, reduce, opts.group_level)
176 } else if emitted.is_empty() {
177 Vec::new()
180 } else {
181 let keys: Vec<serde_json::Value> = emitted.iter().map(|r| r.key.clone()).collect();
182 let values: Vec<serde_json::Value> = emitted.iter().map(|r| r.value.clone()).collect();
183 let result = apply_reduce(reduce, &keys, &values, false);
184 vec![ViewRow {
185 id: None,
186 key: serde_json::Value::Null,
187 value: result,
188 doc: None,
189 }]
190 };
191
192 let reduced_total = rows.len() as u64;
194 let skip = opts.skip as usize;
195 let rows: Vec<ViewRow> = rows
196 .into_iter()
197 .skip(skip)
198 .take(opts.limit.unwrap_or(u64::MAX) as usize)
199 .collect();
200
201 return Ok(ViewResult {
202 total_rows: reduced_total,
203 offset: opts.skip,
204 rows,
205 });
206 }
207
208 let skip = opts.skip as usize;
210 let rows: Vec<ViewRow> = emitted
211 .into_iter()
212 .skip(skip)
213 .take(opts.limit.unwrap_or(u64::MAX) as usize)
214 .map(|r| ViewRow {
215 id: Some(r.id),
216 key: r.key,
217 value: r.value,
218 doc: None,
219 })
220 .collect();
221
222 Ok(ViewResult {
223 total_rows,
224 offset: opts.skip,
225 rows,
226 })
227}
228
229fn filter_by_range(rows: Vec<EmittedRow>, opts: &ViewQueryOptions) -> Vec<EmittedRow> {
230 rows.into_iter()
231 .filter(|r| {
232 if let Some(ref key) = opts.key {
233 return collate(&r.key, key) == Ordering::Equal;
234 }
235
236 if let Some(ref start) = opts.start_key {
237 if opts.descending {
238 if collate(&r.key, start) == Ordering::Greater {
239 return false;
240 }
241 } else if collate(&r.key, start) == Ordering::Less {
242 return false;
243 }
244 }
245
246 if let Some(ref end) = opts.end_key {
247 if opts.descending {
248 let cmp = collate(&r.key, end);
249 if opts.inclusive_end {
250 if cmp == Ordering::Less {
251 return false;
252 }
253 } else if cmp != Ordering::Greater {
254 return false;
255 }
256 } else {
257 let cmp = collate(&r.key, end);
258 if opts.inclusive_end {
259 if cmp == Ordering::Greater {
260 return false;
261 }
262 } else if cmp != Ordering::Less {
263 return false;
264 }
265 }
266 }
267
268 true
269 })
270 .collect()
271}
272
273fn group_reduce(rows: &[EmittedRow], reduce: &ReduceFn, group_level: Option<u64>) -> Vec<ViewRow> {
274 if rows.is_empty() {
275 return vec![];
276 }
277
278 let mut result = Vec::new();
279 let mut current_key = group_key(&rows[0].key, group_level);
280 let mut keys = vec![rows[0].key.clone()];
281 let mut values = vec![rows[0].value.clone()];
282
283 for row in &rows[1..] {
284 let gk = group_key(&row.key, group_level);
285 if collate(&gk, ¤t_key) == Ordering::Equal {
286 keys.push(row.key.clone());
287 values.push(row.value.clone());
288 } else {
289 let reduced = apply_reduce(reduce, &keys, &values, false);
291 result.push(ViewRow {
292 id: None,
293 key: current_key,
294 value: reduced,
295 doc: None,
296 });
297
298 current_key = gk;
299 keys = vec![row.key.clone()];
300 values = vec![row.value.clone()];
301 }
302 }
303
304 let reduced = apply_reduce(reduce, &keys, &values, false);
306 result.push(ViewRow {
307 id: None,
308 key: current_key,
309 value: reduced,
310 doc: None,
311 });
312
313 result
314}
315
316fn group_key(key: &serde_json::Value, group_level: Option<u64>) -> serde_json::Value {
317 match group_level {
318 None => key.clone(), Some(level) => {
320 if let Some(arr) = key.as_array() {
321 let truncated: Vec<serde_json::Value> =
322 arr.iter().take(level as usize).cloned().collect();
323 serde_json::Value::Array(truncated)
324 } else {
325 key.clone()
326 }
327 }
328 }
329}
330
331fn apply_reduce(
332 reduce: &ReduceFn,
333 keys: &[serde_json::Value],
334 values: &[serde_json::Value],
335 rereduce: bool,
336) -> serde_json::Value {
337 match reduce {
338 ReduceFn::Sum => {
339 let sum: f64 = values.iter().filter_map(|v| v.as_f64()).sum();
340 serde_json::json!(sum)
341 }
342 ReduceFn::Count => {
343 serde_json::json!(values.len())
344 }
345 ReduceFn::Stats => {
346 let nums: Vec<f64> = values.iter().filter_map(|v| v.as_f64()).collect();
347 let count = nums.len();
348 if count == 0 {
349 return serde_json::json!({"sum": 0, "count": 0, "min": 0, "max": 0, "sumsqr": 0});
350 }
351 let sum: f64 = nums.iter().sum();
352 let min = nums.iter().copied().fold(f64::INFINITY, f64::min);
353 let max = nums.iter().copied().fold(f64::NEG_INFINITY, f64::max);
354 let sumsqr: f64 = nums.iter().map(|n| n * n).sum();
355 serde_json::json!({
356 "sum": sum,
357 "count": count,
358 "min": min,
359 "max": max,
360 "sumsqr": sumsqr
361 })
362 }
363 ReduceFn::Custom(f) => f(keys, values, rereduce),
364 }
365}
366
367#[cfg(test)]
372mod tests {
373 use super::*;
374 use rouchdb_adapter_memory::MemoryAdapter;
375 use rouchdb_core::document::{BulkDocsOptions, Document};
376 use std::collections::HashMap;
377
378 async fn setup_db() -> MemoryAdapter {
379 let db = MemoryAdapter::new("test");
380 let docs = vec![
381 Document {
382 id: "alice".into(),
383 rev: None,
384 deleted: false,
385 data: serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"}),
386 attachments: HashMap::new(),
387 },
388 Document {
389 id: "bob".into(),
390 rev: None,
391 deleted: false,
392 data: serde_json::json!({"name": "Bob", "age": 25, "city": "LA"}),
393 attachments: HashMap::new(),
394 },
395 Document {
396 id: "charlie".into(),
397 rev: None,
398 deleted: false,
399 data: serde_json::json!({"name": "Charlie", "age": 35, "city": "NYC"}),
400 attachments: HashMap::new(),
401 },
402 ];
403 db.bulk_docs(docs, BulkDocsOptions::new()).await.unwrap();
404 db
405 }
406
407 #[tokio::test]
408 async fn map_emits_all() {
409 let db = setup_db().await;
410
411 let result = query_view(
412 &db,
413 &|doc| {
414 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
415 vec![(name, serde_json::json!(1))]
416 },
417 None,
418 ViewQueryOptions::new(),
419 )
420 .await
421 .unwrap();
422
423 assert_eq!(result.total_rows, 3);
424 assert_eq!(result.rows[0].key, "Alice");
426 assert_eq!(result.rows[1].key, "Bob");
427 assert_eq!(result.rows[2].key, "Charlie");
428 }
429
430 #[tokio::test]
431 async fn reduce_group_level_zero_is_global() {
432 let db = setup_db().await;
433 let result = query_view(
434 &db,
435 &|doc| {
436 let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
437 vec![(city, serde_json::json!(1))]
438 },
439 Some(&ReduceFn::Count),
440 ViewQueryOptions {
441 reduce: true,
442 group_level: Some(0),
443 ..ViewQueryOptions::new()
444 },
445 )
446 .await
447 .unwrap();
448 assert_eq!(result.rows.len(), 1);
450 assert_eq!(result.rows[0].value, serde_json::json!(3));
451 }
452
453 #[tokio::test]
454 async fn reduce_grouped_honors_skip_and_limit() {
455 let db = setup_db().await;
456 let result = query_view(
457 &db,
458 &|doc| {
459 let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
460 vec![(city, serde_json::json!(1))]
461 },
462 Some(&ReduceFn::Count),
463 ViewQueryOptions {
464 reduce: true,
465 group: true,
466 skip: 1,
467 limit: Some(1),
468 ..ViewQueryOptions::new()
469 },
470 )
471 .await
472 .unwrap();
473 assert_eq!(result.total_rows, 2);
475 assert_eq!(result.rows.len(), 1);
476 assert_eq!(result.rows[0].key, "NYC");
477 assert_eq!(result.rows[0].value, serde_json::json!(2));
478 }
479
480 #[tokio::test]
481 async fn map_with_key_filter() {
482 let db = setup_db().await;
483
484 let result = query_view(
485 &db,
486 &|doc| {
487 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
488 vec![(name, serde_json::json!(1))]
489 },
490 None,
491 ViewQueryOptions {
492 key: Some(serde_json::json!("Bob")),
493 ..ViewQueryOptions::new()
494 },
495 )
496 .await
497 .unwrap();
498
499 assert_eq!(result.rows.len(), 1);
500 assert_eq!(result.rows[0].key, "Bob");
501 }
502
503 #[tokio::test]
504 async fn reduce_sum() {
505 let db = setup_db().await;
506
507 let result = query_view(
508 &db,
509 &|doc| {
510 let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
511 vec![(serde_json::Value::Null, age)]
512 },
513 Some(&ReduceFn::Sum),
514 ViewQueryOptions {
515 reduce: true,
516 ..ViewQueryOptions::new()
517 },
518 )
519 .await
520 .unwrap();
521
522 assert_eq!(result.rows.len(), 1);
523 assert_eq!(result.rows[0].value, serde_json::json!(90.0)); }
525
526 #[tokio::test]
527 async fn reduce_count() {
528 let db = setup_db().await;
529
530 let result = query_view(
531 &db,
532 &|doc| {
533 let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
534 vec![(city, serde_json::json!(1))]
535 },
536 Some(&ReduceFn::Count),
537 ViewQueryOptions {
538 reduce: true,
539 ..ViewQueryOptions::new()
540 },
541 )
542 .await
543 .unwrap();
544
545 assert_eq!(result.rows[0].value, serde_json::json!(3));
546 }
547
548 #[tokio::test]
549 async fn reduce_group() {
550 let db = setup_db().await;
551
552 let result = query_view(
553 &db,
554 &|doc| {
555 let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
556 vec![(city, serde_json::json!(1))]
557 },
558 Some(&ReduceFn::Count),
559 ViewQueryOptions {
560 reduce: true,
561 group: true,
562 ..ViewQueryOptions::new()
563 },
564 )
565 .await
566 .unwrap();
567
568 assert_eq!(result.rows.len(), 2); assert_eq!(result.rows[0].key, "LA");
571 assert_eq!(result.rows[0].value, serde_json::json!(1));
572 assert_eq!(result.rows[1].key, "NYC");
573 assert_eq!(result.rows[1].value, serde_json::json!(2));
574 }
575
576 #[tokio::test]
577 async fn reduce_stats() {
578 let db = setup_db().await;
579
580 let result = query_view(
581 &db,
582 &|doc| {
583 let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
584 vec![(serde_json::Value::Null, age)]
585 },
586 Some(&ReduceFn::Stats),
587 ViewQueryOptions {
588 reduce: true,
589 ..ViewQueryOptions::new()
590 },
591 )
592 .await
593 .unwrap();
594
595 let stats = &result.rows[0].value;
596 assert_eq!(stats["count"], 3);
597 assert_eq!(stats["sum"], 90.0);
598 assert_eq!(stats["min"], 25.0);
599 assert_eq!(stats["max"], 35.0);
600 }
601
602 #[tokio::test]
603 async fn descending_and_limit() {
604 let db = setup_db().await;
605
606 let result = query_view(
607 &db,
608 &|doc| {
609 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
610 vec![(name, serde_json::json!(1))]
611 },
612 None,
613 ViewQueryOptions {
614 descending: true,
615 limit: Some(2),
616 ..ViewQueryOptions::new()
617 },
618 )
619 .await
620 .unwrap();
621
622 assert_eq!(result.rows.len(), 2);
623 assert_eq!(result.rows[0].key, "Charlie");
624 assert_eq!(result.rows[1].key, "Bob");
625 }
626
627 #[tokio::test]
628 async fn start_end_key_range() {
629 let db = setup_db().await;
630
631 let result = query_view(
632 &db,
633 &|doc| {
634 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
635 vec![(name, serde_json::json!(1))]
636 },
637 None,
638 ViewQueryOptions {
639 start_key: Some(serde_json::json!("Bob")),
640 end_key: Some(serde_json::json!("Charlie")),
641 ..ViewQueryOptions::new()
642 },
643 )
644 .await
645 .unwrap();
646
647 assert_eq!(result.rows.len(), 2);
648 assert_eq!(result.rows[0].key, "Bob");
649 assert_eq!(result.rows[1].key, "Charlie");
650 }
651}