Skip to main content

rouchdb_views/
engine.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use rouchdb_core::adapter::Adapter;
5use rouchdb_core::document::*;
6use rouchdb_core::error::Result;
7
8/// A map function that takes a document JSON and returns emitted (key, value) pairs.
9pub type MapFn =
10    Arc<dyn Fn(&serde_json::Value) -> Vec<(serde_json::Value, serde_json::Value)> + Send + Sync>;
11
12/// A persistent view index that is incrementally updated.
13pub struct PersistentViewIndex {
14    pub ddoc: String,
15    pub view_name: String,
16    pub last_seq: Seq,
17    /// doc_id -> list of emitted (key, value) pairs.
18    pub entries: BTreeMap<String, Vec<(serde_json::Value, serde_json::Value)>>,
19}
20
21/// Engine for building and querying persistent views.
22///
23/// Views are defined as Rust closures (map functions). The engine
24/// incrementally updates indexes by reading the changes feed since
25/// the last known sequence.
26pub struct ViewEngine {
27    indexes: HashMap<String, PersistentViewIndex>,
28    map_fns: HashMap<String, MapFn>,
29}
30
31impl ViewEngine {
32    pub fn new() -> Self {
33        Self {
34            indexes: HashMap::new(),
35            map_fns: HashMap::new(),
36        }
37    }
38
39    /// Register a Rust map function for a design doc view.
40    pub fn register_map<F>(&mut self, ddoc: &str, view_name: &str, f: F)
41    where
42        F: Fn(&serde_json::Value) -> Vec<(serde_json::Value, serde_json::Value)>
43            + Send
44            + Sync
45            + 'static,
46    {
47        let key = format!("{}/{}", ddoc, view_name);
48        self.map_fns.insert(key, Arc::new(f));
49    }
50
51    /// Update a view index by fetching changes since the last known seq.
52    pub async fn update_index(
53        &mut self,
54        adapter: &dyn Adapter,
55        ddoc: &str,
56        view_name: &str,
57    ) -> Result<()> {
58        let key = format!("{}/{}", ddoc, view_name);
59
60        let map_fn = self
61            .map_fns
62            .get(&key)
63            .ok_or_else(|| {
64                rouchdb_core::error::RouchError::BadRequest(format!(
65                    "no map function registered for {}/{}",
66                    ddoc, view_name
67                ))
68            })?
69            .clone();
70
71        let index = self
72            .indexes
73            .entry(key)
74            .or_insert_with(|| PersistentViewIndex {
75                ddoc: ddoc.into(),
76                view_name: view_name.into(),
77                last_seq: Seq::default(),
78                entries: BTreeMap::new(),
79            });
80
81        let changes = adapter
82            .changes(ChangesOptions {
83                since: index.last_seq.clone(),
84                include_docs: true,
85                ..Default::default()
86            })
87            .await?;
88
89        for event in &changes.results {
90            // Remove old entries for this doc
91            index.entries.remove(&event.id);
92
93            // Skip design docs and deleted docs
94            if event.deleted || event.id.starts_with("_design/") {
95                continue;
96            }
97
98            if let Some(ref doc) = event.doc {
99                let emitted = map_fn(doc);
100                if !emitted.is_empty() {
101                    index.entries.insert(event.id.clone(), emitted);
102                }
103            }
104        }
105
106        index.last_seq = changes.last_seq;
107        Ok(())
108    }
109
110    /// Get a view index by ddoc/view_name.
111    pub fn get_index(&self, ddoc: &str, view_name: &str) -> Option<&PersistentViewIndex> {
112        let key = format!("{}/{}", ddoc, view_name);
113        self.indexes.get(&key)
114    }
115
116    /// Get all registered index names.
117    pub fn index_names(&self) -> Vec<String> {
118        self.indexes.keys().cloned().collect()
119    }
120
121    /// Remove indexes not in the given set of valid names.
122    pub fn remove_indexes_not_in(&mut self, valid: &std::collections::HashSet<String>) {
123        self.indexes.retain(|k, _| valid.contains(k));
124        self.map_fns.retain(|k, _| valid.contains(k));
125    }
126}
127
128impl Default for ViewEngine {
129    fn default() -> Self {
130        Self::new()
131    }
132}