1use std::cmp::Ordering;
8
9use rouchdb_core::adapter::Adapter;
10use rouchdb_core::collation::collate;
11use rouchdb_core::document::AllDocsOptions;
12use rouchdb_core::error::Result;
13
14#[derive(Debug, Clone)]
16pub struct EmittedRow {
17 pub id: String,
18 pub key: serde_json::Value,
19 pub value: serde_json::Value,
20}
21
22pub enum ReduceFn {
24 Sum,
26 Count,
28 Stats,
30 #[allow(clippy::type_complexity)]
32 Custom(Box<dyn Fn(&[serde_json::Value], &[serde_json::Value], bool) -> serde_json::Value>),
33}
34
35#[derive(Debug, Clone, Default)]
37pub struct ViewQueryOptions {
38 pub key: Option<serde_json::Value>,
40 pub start_key: Option<serde_json::Value>,
42 pub end_key: Option<serde_json::Value>,
44 pub inclusive_end: bool,
46 pub descending: bool,
48 pub skip: u64,
50 pub limit: Option<u64>,
52 pub include_docs: bool,
54 pub reduce: bool,
56 pub group: bool,
58 pub group_level: Option<u64>,
60}
61
62impl ViewQueryOptions {
63 pub fn new() -> Self {
64 Self {
65 inclusive_end: true,
66 ..Default::default()
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct ViewResult {
74 pub total_rows: u64,
75 pub offset: u64,
76 pub rows: Vec<ViewRow>,
77}
78
79#[derive(Debug, Clone)]
81pub struct ViewRow {
82 pub id: Option<String>,
83 pub key: serde_json::Value,
84 pub value: serde_json::Value,
85 pub doc: Option<serde_json::Value>,
86}
87
88pub async fn query_view(
92 adapter: &dyn Adapter,
93 map_fn: &dyn Fn(&serde_json::Value) -> Vec<(serde_json::Value, serde_json::Value)>,
94 reduce_fn: Option<&ReduceFn>,
95 opts: ViewQueryOptions,
96) -> Result<ViewResult> {
97 let all = adapter
99 .all_docs(AllDocsOptions {
100 include_docs: true,
101 ..AllDocsOptions::new()
102 })
103 .await?;
104
105 let mut emitted: Vec<EmittedRow> = Vec::new();
106
107 for row in &all.rows {
108 if let Some(ref doc_json) = row.doc {
109 let pairs = map_fn(doc_json);
110 for (key, value) in pairs {
111 emitted.push(EmittedRow {
112 id: row.id.clone(),
113 key,
114 value,
115 });
116 }
117 }
118 }
119
120 emitted.sort_by(|a, b| {
122 let cmp = collate(&a.key, &b.key);
123 if cmp == Ordering::Equal {
124 a.id.cmp(&b.id)
125 } else {
126 cmp
127 }
128 });
129
130 if opts.descending {
131 emitted.reverse();
132 }
133
134 let emitted = filter_by_range(emitted, &opts);
136
137 let total_rows = emitted.len() as u64;
138
139 if opts.reduce
141 && let Some(reduce) = reduce_fn {
142 let rows = if opts.group || opts.group_level.is_some() {
143 group_reduce(&emitted, reduce, opts.group_level)
144 } else {
145 let keys: Vec<serde_json::Value> = emitted.iter().map(|r| r.key.clone()).collect();
146 let values: Vec<serde_json::Value> =
147 emitted.iter().map(|r| r.value.clone()).collect();
148 let result = apply_reduce(reduce, &keys, &values, false);
149 vec![ViewRow {
150 id: None,
151 key: serde_json::Value::Null,
152 value: result,
153 doc: None,
154 }]
155 };
156
157 return Ok(ViewResult {
158 total_rows: rows.len() as u64,
159 offset: 0,
160 rows,
161 });
162 }
163
164 let skip = opts.skip as usize;
166 let rows: Vec<ViewRow> = emitted
167 .into_iter()
168 .skip(skip)
169 .take(opts.limit.unwrap_or(u64::MAX) as usize)
170 .map(|r| ViewRow {
171 id: Some(r.id),
172 key: r.key,
173 value: r.value,
174 doc: None,
175 })
176 .collect();
177
178 Ok(ViewResult {
179 total_rows,
180 offset: opts.skip,
181 rows,
182 })
183}
184
185fn filter_by_range(rows: Vec<EmittedRow>, opts: &ViewQueryOptions) -> Vec<EmittedRow> {
186 rows.into_iter()
187 .filter(|r| {
188 if let Some(ref key) = opts.key {
189 return collate(&r.key, key) == Ordering::Equal;
190 }
191
192 if let Some(ref start) = opts.start_key {
193 if opts.descending {
194 if collate(&r.key, start) == Ordering::Greater {
195 return false;
196 }
197 } else if collate(&r.key, start) == Ordering::Less {
198 return false;
199 }
200 }
201
202 if let Some(ref end) = opts.end_key {
203 if opts.descending {
204 let cmp = collate(&r.key, end);
205 if opts.inclusive_end {
206 if cmp == Ordering::Less {
207 return false;
208 }
209 } else if cmp != Ordering::Greater {
210 return false;
211 }
212 } else {
213 let cmp = collate(&r.key, end);
214 if opts.inclusive_end {
215 if cmp == Ordering::Greater {
216 return false;
217 }
218 } else if cmp != Ordering::Less {
219 return false;
220 }
221 }
222 }
223
224 true
225 })
226 .collect()
227}
228
229fn group_reduce(rows: &[EmittedRow], reduce: &ReduceFn, group_level: Option<u64>) -> Vec<ViewRow> {
230 if rows.is_empty() {
231 return vec![];
232 }
233
234 let mut result = Vec::new();
235 let mut current_key = group_key(&rows[0].key, group_level);
236 let mut keys = vec![rows[0].key.clone()];
237 let mut values = vec![rows[0].value.clone()];
238
239 for row in &rows[1..] {
240 let gk = group_key(&row.key, group_level);
241 if collate(&gk, ¤t_key) == Ordering::Equal {
242 keys.push(row.key.clone());
243 values.push(row.value.clone());
244 } else {
245 let reduced = apply_reduce(reduce, &keys, &values, false);
247 result.push(ViewRow {
248 id: None,
249 key: current_key,
250 value: reduced,
251 doc: None,
252 });
253
254 current_key = gk;
255 keys = vec![row.key.clone()];
256 values = vec![row.value.clone()];
257 }
258 }
259
260 let reduced = apply_reduce(reduce, &keys, &values, false);
262 result.push(ViewRow {
263 id: None,
264 key: current_key,
265 value: reduced,
266 doc: None,
267 });
268
269 result
270}
271
272fn group_key(key: &serde_json::Value, group_level: Option<u64>) -> serde_json::Value {
273 match group_level {
274 None => key.clone(), Some(level) => {
276 if let Some(arr) = key.as_array() {
277 let truncated: Vec<serde_json::Value> =
278 arr.iter().take(level as usize).cloned().collect();
279 serde_json::Value::Array(truncated)
280 } else {
281 key.clone()
282 }
283 }
284 }
285}
286
287fn apply_reduce(
288 reduce: &ReduceFn,
289 keys: &[serde_json::Value],
290 values: &[serde_json::Value],
291 rereduce: bool,
292) -> serde_json::Value {
293 match reduce {
294 ReduceFn::Sum => {
295 let sum: f64 = values.iter().filter_map(|v| v.as_f64()).sum();
296 serde_json::json!(sum)
297 }
298 ReduceFn::Count => {
299 serde_json::json!(values.len())
300 }
301 ReduceFn::Stats => {
302 let nums: Vec<f64> = values.iter().filter_map(|v| v.as_f64()).collect();
303 let count = nums.len();
304 if count == 0 {
305 return serde_json::json!({"sum": 0, "count": 0, "min": 0, "max": 0, "sumsqr": 0});
306 }
307 let sum: f64 = nums.iter().sum();
308 let min = nums.iter().copied().fold(f64::INFINITY, f64::min);
309 let max = nums.iter().copied().fold(f64::NEG_INFINITY, f64::max);
310 let sumsqr: f64 = nums.iter().map(|n| n * n).sum();
311 serde_json::json!({
312 "sum": sum,
313 "count": count,
314 "min": min,
315 "max": max,
316 "sumsqr": sumsqr
317 })
318 }
319 ReduceFn::Custom(f) => f(keys, values, rereduce),
320 }
321}
322
323#[cfg(test)]
328mod tests {
329 use super::*;
330 use rouchdb_adapter_memory::MemoryAdapter;
331 use rouchdb_core::document::{BulkDocsOptions, Document};
332 use std::collections::HashMap;
333
334 async fn setup_db() -> MemoryAdapter {
335 let db = MemoryAdapter::new("test");
336 let docs = vec![
337 Document {
338 id: "alice".into(),
339 rev: None,
340 deleted: false,
341 data: serde_json::json!({"name": "Alice", "age": 30, "city": "NYC"}),
342 attachments: HashMap::new(),
343 },
344 Document {
345 id: "bob".into(),
346 rev: None,
347 deleted: false,
348 data: serde_json::json!({"name": "Bob", "age": 25, "city": "LA"}),
349 attachments: HashMap::new(),
350 },
351 Document {
352 id: "charlie".into(),
353 rev: None,
354 deleted: false,
355 data: serde_json::json!({"name": "Charlie", "age": 35, "city": "NYC"}),
356 attachments: HashMap::new(),
357 },
358 ];
359 db.bulk_docs(docs, BulkDocsOptions::new()).await.unwrap();
360 db
361 }
362
363 #[tokio::test]
364 async fn map_emits_all() {
365 let db = setup_db().await;
366
367 let result = query_view(
368 &db,
369 &|doc| {
370 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
371 vec![(name, serde_json::json!(1))]
372 },
373 None,
374 ViewQueryOptions::new(),
375 )
376 .await
377 .unwrap();
378
379 assert_eq!(result.total_rows, 3);
380 assert_eq!(result.rows[0].key, "Alice");
382 assert_eq!(result.rows[1].key, "Bob");
383 assert_eq!(result.rows[2].key, "Charlie");
384 }
385
386 #[tokio::test]
387 async fn map_with_key_filter() {
388 let db = setup_db().await;
389
390 let result = query_view(
391 &db,
392 &|doc| {
393 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
394 vec![(name, serde_json::json!(1))]
395 },
396 None,
397 ViewQueryOptions {
398 key: Some(serde_json::json!("Bob")),
399 ..ViewQueryOptions::new()
400 },
401 )
402 .await
403 .unwrap();
404
405 assert_eq!(result.rows.len(), 1);
406 assert_eq!(result.rows[0].key, "Bob");
407 }
408
409 #[tokio::test]
410 async fn reduce_sum() {
411 let db = setup_db().await;
412
413 let result = query_view(
414 &db,
415 &|doc| {
416 let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
417 vec![(serde_json::Value::Null, age)]
418 },
419 Some(&ReduceFn::Sum),
420 ViewQueryOptions {
421 reduce: true,
422 ..ViewQueryOptions::new()
423 },
424 )
425 .await
426 .unwrap();
427
428 assert_eq!(result.rows.len(), 1);
429 assert_eq!(result.rows[0].value, serde_json::json!(90.0)); }
431
432 #[tokio::test]
433 async fn reduce_count() {
434 let db = setup_db().await;
435
436 let result = query_view(
437 &db,
438 &|doc| {
439 let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
440 vec![(city, serde_json::json!(1))]
441 },
442 Some(&ReduceFn::Count),
443 ViewQueryOptions {
444 reduce: true,
445 ..ViewQueryOptions::new()
446 },
447 )
448 .await
449 .unwrap();
450
451 assert_eq!(result.rows[0].value, serde_json::json!(3));
452 }
453
454 #[tokio::test]
455 async fn reduce_group() {
456 let db = setup_db().await;
457
458 let result = query_view(
459 &db,
460 &|doc| {
461 let city = doc.get("city").cloned().unwrap_or(serde_json::Value::Null);
462 vec![(city, serde_json::json!(1))]
463 },
464 Some(&ReduceFn::Count),
465 ViewQueryOptions {
466 reduce: true,
467 group: true,
468 ..ViewQueryOptions::new()
469 },
470 )
471 .await
472 .unwrap();
473
474 assert_eq!(result.rows.len(), 2); assert_eq!(result.rows[0].key, "LA");
477 assert_eq!(result.rows[0].value, serde_json::json!(1));
478 assert_eq!(result.rows[1].key, "NYC");
479 assert_eq!(result.rows[1].value, serde_json::json!(2));
480 }
481
482 #[tokio::test]
483 async fn reduce_stats() {
484 let db = setup_db().await;
485
486 let result = query_view(
487 &db,
488 &|doc| {
489 let age = doc.get("age").cloned().unwrap_or(serde_json::json!(0));
490 vec![(serde_json::Value::Null, age)]
491 },
492 Some(&ReduceFn::Stats),
493 ViewQueryOptions {
494 reduce: true,
495 ..ViewQueryOptions::new()
496 },
497 )
498 .await
499 .unwrap();
500
501 let stats = &result.rows[0].value;
502 assert_eq!(stats["count"], 3);
503 assert_eq!(stats["sum"], 90.0);
504 assert_eq!(stats["min"], 25.0);
505 assert_eq!(stats["max"], 35.0);
506 }
507
508 #[tokio::test]
509 async fn descending_and_limit() {
510 let db = setup_db().await;
511
512 let result = query_view(
513 &db,
514 &|doc| {
515 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
516 vec![(name, serde_json::json!(1))]
517 },
518 None,
519 ViewQueryOptions {
520 descending: true,
521 limit: Some(2),
522 ..ViewQueryOptions::new()
523 },
524 )
525 .await
526 .unwrap();
527
528 assert_eq!(result.rows.len(), 2);
529 assert_eq!(result.rows[0].key, "Charlie");
530 assert_eq!(result.rows[1].key, "Bob");
531 }
532
533 #[tokio::test]
534 async fn start_end_key_range() {
535 let db = setup_db().await;
536
537 let result = query_view(
538 &db,
539 &|doc| {
540 let name = doc.get("name").cloned().unwrap_or(serde_json::Value::Null);
541 vec![(name, serde_json::json!(1))]
542 },
543 None,
544 ViewQueryOptions {
545 start_key: Some(serde_json::json!("Bob")),
546 end_key: Some(serde_json::json!("Charlie")),
547 ..ViewQueryOptions::new()
548 },
549 )
550 .await
551 .unwrap();
552
553 assert_eq!(result.rows.len(), 2);
554 assert_eq!(result.rows[0].key, "Bob");
555 assert_eq!(result.rows[1].key, "Charlie");
556 }
557}