1use std::cmp::Ordering;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::collation::collate;
11use rouchdb_core::document::AllDocsOptions;
12use rouchdb_core::error::Result;
13
14#[derive(Debug, Clone)]
16pub struct EmittedRow {
17 pub id: String,
18 pub key: serde_json::Value,
19 pub value: serde_json::Value,
20}
21
22pub enum ReduceFn {
24 Sum,
26 Count,
28 Stats,
30 #[allow(clippy::type_complexity)]
32 Custom(Box<dyn Fn(&[serde_json::Value], &[serde_json::Value], bool) -> serde_json::Value>),
33}
34
35#[derive(Debug, Clone, Default)]
37pub struct ViewQueryOptions {
38 pub key: Option<serde_json::Value>,
40 pub keys: Option<Vec<serde_json::Value>>,
42 pub start_key: Option<serde_json::Value>,
44 pub end_key: Option<serde_json::Value>,
46 pub inclusive_end: bool,
48 pub descending: bool,
50 pub skip: u64,
52 pub limit: Option<u64>,
54 pub include_docs: bool,
56 pub reduce: bool,
58 pub group: bool,
60 pub group_level: Option<u64>,
62 pub stale: StaleOption,
64}
65
66#[derive(Debug, Clone, Default, PartialEq, Eq)]
68pub enum StaleOption {
69 #[default]
71 False,
72 Ok,
74 UpdateAfter,
76}
77
78impl ViewQueryOptions {
79 pub fn new() -> Self {
80 Self {
81 inclusive_end: true,
82 ..Default::default()
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct ViewResult {
90 pub total_rows: u64,
91 pub offset: u64,
92 pub rows: Vec<ViewRow>,
93}
94
95#[derive(Debug, Clone)]
97pub struct ViewRow {
98 pub id: Option<String>,
99 pub key: serde_json::Value,
100 pub value: serde_json::Value,
101 pub doc: Option<serde_json::Value>,
102}
103
104pub async fn query_view(
108 adapter: &dyn Adapter,
109 map_fn: &dyn Fn(&serde_json::Value) -> Vec<(serde_json::Value, serde_json::Value)>,
110 reduce_fn: Option<&ReduceFn>,
111 opts: ViewQueryOptions,
112) -> Result<ViewResult> {
113 let all = adapter
115 .all_docs(AllDocsOptions {
116 include_docs: true,
117 ..AllDocsOptions::new()
118 })
119 .await?;
120
121 let mut emitted: Vec<EmittedRow> = Vec::new();
122
123 for row in &all.rows {
124 if let Some(ref doc_json) = row.doc {
125 let pairs = map_fn(doc_json);
126 for (key, value) in pairs {
127 emitted.push(EmittedRow {
128 id: row.id.clone(),
129 key,
130 value,
131 });
132 }
133 }
134 }
135
136 emitted.sort_by(|a, b| {
138 let cmp = collate(&a.key, &b.key);
139 if cmp == Ordering::Equal {
140 a.id.cmp(&b.id)
141 } else {
142 cmp
143 }
144 });
145
146 if opts.descending {
147 emitted.reverse();
148 }
149
150 let emitted = if let Some(ref keys) = opts.keys {
152 let mut ordered_rows = Vec::new();
153 for search_key in keys {
154 for row in &emitted {
155 if collate(&row.key, search_key) == Ordering::Equal {
156 ordered_rows.push(row.clone());
157 }
158 }
159 }
160 ordered_rows
161 } else {
162 filter_by_range(emitted, &opts)
163 };
164
165 let total_rows = emitted.len() as u64;
166
167 if opts.reduce
169 && let Some(reduce) = reduce_fn
170 {
171 let rows = if opts.group || opts.group_level.is_some() {
172 group_reduce(&emitted, reduce, opts.group_level)
173 } else {
174 let keys: Vec<serde_json::Value> = emitted.iter().map(|r| r.key.clone()).collect();
175 let values: Vec<serde_json::Value> = emitted.iter().map(|r| r.value.clone()).collect();
176 let result = apply_reduce(reduce, &keys, &values, false);
177 vec![ViewRow {
178 id: None,
179 key: serde_json::Value::Null,
180 value: result,
181 doc: None,
182 }]
183 };
184
185 return Ok(ViewResult {
186 total_rows: rows.len() as u64,
187 offset: 0,
188 rows,
189 });
190 }
191
192 let skip = opts.skip as usize;
194 let rows: Vec<ViewRow> = emitted
195 .into_iter()
196 .skip(skip)
197 .take(opts.limit.unwrap_or(u64::MAX) as usize)
198 .map(|r| ViewRow {
199 id: Some(r.id),
200 key: r.key,
201 value: r.value,
202 doc: None,
203 })
204 .collect();
205
206 Ok(ViewResult {
207 total_rows,
208 offset: opts.skip,
209 rows,
210 })
211}
212
213fn filter_by_range(rows: Vec<EmittedRow>, opts: &ViewQueryOptions) -> Vec<EmittedRow> {
214 rows.into_iter()
215 .filter(|r| {
216 if let Some(ref key) = opts.key {
217 return collate(&r.key, key) == Ordering::Equal;
218 }
219
220 if let Some(ref start) = opts.start_key {
221 if opts.descending {
222 if collate(&r.key, start) == Ordering::Greater {
223 return false;
224 }
225 } else if collate(&r.key, start) == Ordering::Less {
226 return false;
227 }
228 }
229
230 if let Some(ref end) = opts.end_key {
231 if opts.descending {
232 let cmp = collate(&r.key, end);
233 if opts.inclusive_end {
234 if cmp == Ordering::Less {
235 return false;
236 }
237 } else if cmp != Ordering::Greater {
238 return false;
239 }
240 } else {
241 let cmp = collate(&r.key, end);
242 if opts.inclusive_end {
243 if cmp == Ordering::Greater {
244 return false;
245 }
246 } else if cmp != Ordering::Less {
247 return false;
248 }
249 }
250 }
251
252 true
253 })
254 .collect()
255}
256
257fn group_reduce(rows: &[EmittedRow], reduce: &ReduceFn, group_level: Option<u64>) -> Vec<ViewRow> {
258 if rows.is_empty() {
259 return vec![];
260 }
261
262 let mut result = Vec::new();
263 let mut current_key = group_key(&rows[0].key, group_level);
264 let mut keys = vec![rows[0].key.clone()];
265 let mut values = vec![rows[0].value.clone()];
266
267 for row in &rows[1..] {
268 let gk = group_key(&row.key, group_level);
269 if collate(&gk, ¤t_key) == Ordering::Equal {
270 keys.push(row.key.clone());
271 values.push(row.value.clone());
272 } else {
273 let reduced = apply_reduce(reduce, &keys, &values, false);
275 result.push(ViewRow {
276 id: None,
277 key: current_key,
278 value: reduced,
279 doc: None,
280 });
281
282 current_key = gk;
283 keys = vec![row.key.clone()];
284 values = vec![row.value.clone()];
285 }
286 }
287
288 let reduced = apply_reduce(reduce, &keys, &values, false);
290 result.push(ViewRow {
291 id: None,
292 key: current_key,
293 value: reduced,
294 doc: None,
295 });
296
297 result
298}
299
300fn group_key(key: &serde_json::Value, group_level: Option<u64>) -> serde_json::Value {
301 match group_level {
302 None => key.clone(), Some(level) => {
304 if let Some(arr) = key.as_array() {
305 let truncated: Vec<serde_json::Value> =
306 arr.iter().take(level as usize).cloned().collect();
307 serde_json::Value::Array(truncated)
308 } else {
309 key.clone()
310 }
311 }
312 }
313}
314
315fn apply_reduce(
316 reduce: &ReduceFn,
317 keys: &[serde_json::Value],
318 values: &[serde_json::Value],
319 rereduce: bool,
320) -> serde_json::Value {
321 match reduce {
322 ReduceFn::Sum => {
323 let sum: f64 = values.iter().filter_map(|v| v.as_f64()).sum();
324 serde_json::json!(sum)
325 }
326 ReduceFn::Count => {
327 serde_json::json!(values.len())
328 }
329 ReduceFn::Stats => {
330 let nums: Vec<f64> = values.iter().filter_map(|v| v.as_f64()).collect();
331 let count = nums.len();
332 if count == 0 {
333 return serde_json::json!({"sum": 0, "count": 0, "min": 0, "max": 0, "sumsqr": 0});
334 }
335 let sum: f64 = nums.iter().sum();
336 let min = nums.iter().copied().fold(f64::INFINITY, f64::min);
337 let max = nums.iter().copied().fold(f64::NEG_INFINITY, f64::max);
338 let sumsqr: f64 = nums.iter().map(|n| n * n).sum();
339 serde_json::json!({
340 "sum": sum,
341 "count": count,
342 "min": min,
343 "max": max,
344 "sumsqr": sumsqr
345 })
346 }
347 ReduceFn::Custom(f) => f(keys, values, rereduce),
348 }
349}
350
351#[cfg(test)]
356mod tests {
357 use super::*;
358 use rouchdb_adapter_memory::MemoryAdapter;
359 use rouchdb_core::document::{BulkDocsOptions, Document};
360 use std::collections::HashMap;
361
362 async fn setup_db() -> MemoryAdapter {
363 let db = MemoryAdapter::new("test");
364 let docs = vec![
365 Document {
366 id: "alice".into(),
367 rev: None,
368 deleted: false,
369 data: serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"}),
370 attachments: HashMap::new(),
371 },
372 Document {
373 id: "bob".into(),
374 rev: None,
375 deleted: false,
376 data: serde_json::json!({"name": "Bob", "age": 25, "city": "LA"}),
377 attachments: HashMap::new(),
378 },
379 Document {
380 id: "charlie".into(),
381 rev: None,
382 deleted: false,
383 data: serde_json::json!({"name": "Charlie", "age": 35, "city": "NYC"}),
384 attachments: HashMap::new(),
385 },
386 ];
387 db.bulk_docs(docs, BulkDocsOptions::new()).await.unwrap();
388 db
389 }
390
391 #[tokio::test]
392 async fn map_emits_all() {
393 let db = setup_db().await;
394
395 let result = query_view(
396 &db,
397 &|doc| {
398 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
399 vec![(name, serde_json::json!(1))]
400 },
401 None,
402 ViewQueryOptions::new(),
403 )
404 .await
405 .unwrap();
406
407 assert_eq!(result.total_rows, 3);
408 assert_eq!(result.rows[0].key, "Alice");
410 assert_eq!(result.rows[1].key, "Bob");
411 assert_eq!(result.rows[2].key, "Charlie");
412 }
413
414 #[tokio::test]
415 async fn map_with_key_filter() {
416 let db = setup_db().await;
417
418 let result = query_view(
419 &db,
420 &|doc| {
421 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
422 vec![(name, serde_json::json!(1))]
423 },
424 None,
425 ViewQueryOptions {
426 key: Some(serde_json::json!("Bob")),
427 ..ViewQueryOptions::new()
428 },
429 )
430 .await
431 .unwrap();
432
433 assert_eq!(result.rows.len(), 1);
434 assert_eq!(result.rows[0].key, "Bob");
435 }
436
437 #[tokio::test]
438 async fn reduce_sum() {
439 let db = setup_db().await;
440
441 let result = query_view(
442 &db,
443 &|doc| {
444 let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
445 vec![(serde_json::Value::Null, age)]
446 },
447 Some(&ReduceFn::Sum),
448 ViewQueryOptions {
449 reduce: true,
450 ..ViewQueryOptions::new()
451 },
452 )
453 .await
454 .unwrap();
455
456 assert_eq!(result.rows.len(), 1);
457 assert_eq!(result.rows[0].value, serde_json::json!(90.0)); }
459
460 #[tokio::test]
461 async fn reduce_count() {
462 let db = setup_db().await;
463
464 let result = query_view(
465 &db,
466 &|doc| {
467 let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
468 vec![(city, serde_json::json!(1))]
469 },
470 Some(&ReduceFn::Count),
471 ViewQueryOptions {
472 reduce: true,
473 ..ViewQueryOptions::new()
474 },
475 )
476 .await
477 .unwrap();
478
479 assert_eq!(result.rows[0].value, serde_json::json!(3));
480 }
481
482 #[tokio::test]
483 async fn reduce_group() {
484 let db = setup_db().await;
485
486 let result = query_view(
487 &db,
488 &|doc| {
489 let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
490 vec![(city, serde_json::json!(1))]
491 },
492 Some(&ReduceFn::Count),
493 ViewQueryOptions {
494 reduce: true,
495 group: true,
496 ..ViewQueryOptions::new()
497 },
498 )
499 .await
500 .unwrap();
501
502 assert_eq!(result.rows.len(), 2); assert_eq!(result.rows[0].key, "LA");
505 assert_eq!(result.rows[0].value, serde_json::json!(1));
506 assert_eq!(result.rows[1].key, "NYC");
507 assert_eq!(result.rows[1].value, serde_json::json!(2));
508 }
509
510 #[tokio::test]
511 async fn reduce_stats() {
512 let db = setup_db().await;
513
514 let result = query_view(
515 &db,
516 &|doc| {
517 let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
518 vec![(serde_json::Value::Null, age)]
519 },
520 Some(&ReduceFn::Stats),
521 ViewQueryOptions {
522 reduce: true,
523 ..ViewQueryOptions::new()
524 },
525 )
526 .await
527 .unwrap();
528
529 let stats = &result.rows[0].value;
530 assert_eq!(stats["count"], 3);
531 assert_eq!(stats["sum"], 90.0);
532 assert_eq!(stats["min"], 25.0);
533 assert_eq!(stats["max"], 35.0);
534 }
535
536 #[tokio::test]
537 async fn descending_and_limit() {
538 let db = setup_db().await;
539
540 let result = query_view(
541 &db,
542 &|doc| {
543 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
544 vec![(name, serde_json::json!(1))]
545 },
546 None,
547 ViewQueryOptions {
548 descending: true,
549 limit: Some(2),
550 ..ViewQueryOptions::new()
551 },
552 )
553 .await
554 .unwrap();
555
556 assert_eq!(result.rows.len(), 2);
557 assert_eq!(result.rows[0].key, "Charlie");
558 assert_eq!(result.rows[1].key, "Bob");
559 }
560
561 #[tokio::test]
562 async fn start_end_key_range() {
563 let db = setup_db().await;
564
565 let result = query_view(
566 &db,
567 &|doc| {
568 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
569 vec![(name, serde_json::json!(1))]
570 },
571 None,
572 ViewQueryOptions {
573 start_key: Some(serde_json::json!("Bob")),
574 end_key: Some(serde_json::json!("Charlie")),
575 ..ViewQueryOptions::new()
576 },
577 )
578 .await
579 .unwrap();
580
581 assert_eq!(result.rows.len(), 2);
582 assert_eq!(result.rows[0].key, "Bob");
583 assert_eq!(result.rows[1].key, "Charlie");
584 }
585}