use std::sync::Arc;
use loro_common::TreeID;
use smallvec::SmallVec;
use crate::{
event::{Diff, Index},
jsonpath::{
ast::{Query, Segment, Selector},
JSONPathParser,
},
utils::subscription::Subscription,
LoroDoc, LoroError, LoroResult,
};
pub type SubscribeJsonPathCallback = Arc<dyn Fn() + Send + Sync + 'static>;
#[derive(Debug, Clone, PartialEq, Eq)]
enum PathElem {
Key(Arc<str>),
Seq(Option<usize>),
Node(TreeID),
}
impl From<&Index> for PathElem {
fn from(value: &Index) -> Self {
match value {
Index::Key(k) => PathElem::Key(k.as_ref().into()),
Index::Seq(i) => PathElem::Seq(Some(*i)),
Index::Node(n) => PathElem::Node(*n),
}
}
}
#[derive(Debug, Clone)]
enum MatchSelector {
Name(Arc<str>),
Index(usize),
Wildcard,
}
#[derive(Debug, Clone)]
struct Step {
recursive: bool,
selectors: SmallVec<[MatchSelector; 2]>,
}
#[derive(Debug, Clone)]
struct JsonPathMatcher {
steps: Vec<Step>,
}
impl JsonPathMatcher {
fn new(query: &Query) -> Self {
let mut steps = Vec::new();
build_steps(&query.segments, &mut steps);
JsonPathMatcher { steps }
}
fn may_match(&self, path: &[PathElem]) -> bool {
if self.steps.is_empty() {
return true;
}
let positions = self.positions_after(path);
positions.iter().any(|&p| p >= self.steps.len())
}
#[inline]
fn passed_through_wildcard(&self, positions: &[usize]) -> bool {
positions.iter().any(|&pos| {
pos > 0
&& self.steps[pos - 1]
.selectors
.iter()
.any(|s| matches!(s, MatchSelector::Wildcard))
})
}
fn positions_after(&self, path: &[PathElem]) -> SmallVec<[usize; 8]> {
let mut positions = SmallVec::<[usize; 8]>::new();
positions.push(0);
for elem in path {
let mut next = SmallVec::<[usize; 8]>::new();
for &pos in positions.iter() {
if pos >= self.steps.len() {
next.push(pos);
continue;
}
let step = &self.steps[pos];
if step.recursive {
next.push(pos);
}
if selector_matches(&step.selectors, elem) {
next.push(pos + 1);
}
}
dedup_positions(&mut next);
if next.is_empty() {
return next;
}
positions = next;
}
positions
}
}
fn selector_matches(selectors: &[MatchSelector], elem: &PathElem) -> bool {
selectors.iter().any(|sel| match sel {
MatchSelector::Name(name) => matches!(elem, PathElem::Key(k) if k == name),
MatchSelector::Index(idx) => match elem {
PathElem::Seq(Some(i)) => *i == *idx,
PathElem::Seq(None) => true, _ => false,
},
MatchSelector::Wildcard => true,
})
}
fn to_match_selector(sel: &Selector) -> MatchSelector {
match sel {
Selector::Name { name } => MatchSelector::Name(name.as_str().into()),
Selector::Index { index } if *index >= 0 => MatchSelector::Index(*index as usize),
_ => MatchSelector::Wildcard,
}
}
fn build_steps(segment: &Segment, steps: &mut Vec<Step>) {
match segment {
Segment::Root {} => {}
Segment::Child { left, selectors } => {
build_steps(left, steps);
steps.push(Step {
recursive: false,
selectors: selectors.iter().map(to_match_selector).collect(),
});
}
Segment::Recursive { left, selectors } => {
build_steps(left, steps);
steps.push(Step {
recursive: true,
selectors: selectors.iter().map(to_match_selector).collect(),
});
}
}
}
#[inline]
fn dedup_positions(v: &mut SmallVec<[usize; 8]>) {
v.sort_unstable();
v.dedup();
}
impl LoroDoc {
#[cfg(feature = "jsonpath")]
pub fn subscribe_jsonpath(
&self,
jsonpath: &str,
callback: SubscribeJsonPathCallback,
) -> LoroResult<Subscription> {
let query = JSONPathParser::new()
.parse(jsonpath)
.map_err(|e| LoroError::ArgErr(e.to_string().into_boxed_str()))?;
let matcher = Arc::new(JsonPathMatcher::new(&query));
let sub = self.subscribe_root(Arc::new(move |event| {
if event.events.is_empty() {
return;
}
let matcher = &matcher;
let mut fired = false;
for container_diff in event.events.iter() {
if fired {
break;
}
let base_path: SmallVec<[PathElem; 8]> = container_diff
.path
.iter()
.map(|(_, idx)| PathElem::from(idx))
.collect();
if matcher.may_match(&base_path) {
fired = true;
break;
}
if let Diff::Map(map) = &container_diff.diff {
let base_positions = matcher.positions_after(&base_path);
if base_positions.is_empty() {
continue;
}
let past_wildcard = matcher.passed_through_wildcard(&base_positions);
for key in map.updated.keys() {
let mut extended: SmallVec<[PathElem; 8]> = base_path.clone();
extended.push(PathElem::Key(key.as_ref().into()));
let extended_positions = matcher.positions_after(&extended);
if !extended_positions.is_empty() || past_wildcard {
fired = true;
break;
}
}
if fired {
break;
}
}
let has_child_changes = matches!(
&container_diff.diff,
Diff::List(_) | Diff::Tree(_) | Diff::Unknown
);
#[cfg(feature = "counter")]
let has_child_changes =
has_child_changes || matches!(&container_diff.diff, Diff::Counter(_));
if has_child_changes {
let mut extended: SmallVec<[PathElem; 8]> = base_path.clone();
extended.push(PathElem::Seq(None)); if !matcher.positions_after(&extended).is_empty() {
fired = true;
}
}
}
if fired {
(callback)();
}
}));
Ok(sub)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{LoroDoc, MapHandler};
use std::sync::atomic::{AtomicUsize, Ordering};
fn make_book(
doc: &LoroDoc,
idx: usize,
title: &str,
available: bool,
price: i64,
) -> MapHandler {
let books = doc.get_list("books");
let book = books
.insert_container(idx, MapHandler::new_detached())
.unwrap();
book.insert("title", title).unwrap();
book.insert("available", available).unwrap();
book.insert("price", price).unwrap();
book
}
#[test]
fn jsonpath_subscribe_triggers_on_specific_key() {
let doc = LoroDoc::new_auto_commit();
let first_book = make_book(&doc, 0, "Old", true, 10);
doc.commit_then_renew();
let hit = Arc::new(AtomicUsize::new(0));
let hit_ref = hit.clone();
let _sub = doc
.subscribe_jsonpath(
"$.books[0].title",
Arc::new(move || {
hit_ref.fetch_add(1, Ordering::SeqCst);
}),
)
.unwrap();
first_book.insert("title", "New").unwrap();
doc.commit_then_renew();
assert!(hit.load(Ordering::SeqCst) >= 1);
}
#[test]
fn jsonpath_subscribe_wildcard_on_list() {
let doc = LoroDoc::new_auto_commit();
make_book(&doc, 0, "A", true, 10);
let second_book = make_book(&doc, 1, "B", true, 20);
doc.commit_then_renew();
let hit = Arc::new(AtomicUsize::new(0));
let hit_ref = hit.clone();
let _sub = doc
.subscribe_jsonpath(
"$.books[*].price",
Arc::new(move || {
hit_ref.fetch_add(1, Ordering::SeqCst);
}),
)
.unwrap();
second_book.insert("price", 25).unwrap();
doc.commit_then_renew();
assert!(hit.load(Ordering::SeqCst) >= 1);
}
#[test]
fn jsonpath_subscribe_negative_index() {
let doc = LoroDoc::new_auto_commit();
make_book(&doc, 0, "A", true, 10);
make_book(&doc, 1, "B", true, 20);
doc.commit_then_renew();
let hit = Arc::new(AtomicUsize::new(0));
let hit_ref = hit.clone();
let _sub = doc
.subscribe_jsonpath(
"$.books[-1].title",
Arc::new(move || {
hit_ref.fetch_add(1, Ordering::SeqCst);
}),
)
.unwrap();
let books = doc.get_list("books");
let last = books.get_child_handler(1).unwrap();
last.as_map().unwrap().insert("title", "B updated").unwrap();
doc.commit_then_renew();
assert!(hit.load(Ordering::SeqCst) >= 1);
}
#[test]
fn jsonpath_subscribe_slice_range() {
let doc = LoroDoc::new_auto_commit();
make_book(&doc, 0, "A", true, 10);
let second_book = make_book(&doc, 1, "B", true, 20);
make_book(&doc, 2, "C", true, 30);
doc.commit_then_renew();
let hit = Arc::new(AtomicUsize::new(0));
let hit_ref = hit.clone();
let _sub = doc
.subscribe_jsonpath(
"$.books[0:2].title",
Arc::new(move || {
hit_ref.fetch_add(1, Ordering::SeqCst);
}),
)
.unwrap();
second_book.insert("title", "B updated").unwrap();
doc.commit_then_renew();
assert!(hit.load(Ordering::SeqCst) >= 1);
}
#[test]
fn jsonpath_subscribe_recursive() {
let doc = LoroDoc::new_auto_commit();
let store = doc.get_map("store");
let nested = store
.insert_container("inventory", MapHandler::new_detached())
.unwrap();
nested.insert("total", 3).unwrap();
doc.commit_then_renew();
let hit = Arc::new(AtomicUsize::new(0));
let hit_ref = hit.clone();
let _sub = doc
.subscribe_jsonpath(
"$..total",
Arc::new(move || {
hit_ref.fetch_add(1, Ordering::SeqCst);
}),
)
.unwrap();
nested.insert("total", 4).unwrap();
doc.commit_then_renew();
assert!(hit.load(Ordering::SeqCst) >= 1);
}
#[test]
fn jsonpath_subscribe_filter_treated_as_wildcard() {
let doc = LoroDoc::new_auto_commit();
make_book(&doc, 0, "A", true, 10);
let second_book = make_book(&doc, 1, "B", false, 20);
doc.commit_then_renew();
let hit = Arc::new(AtomicUsize::new(0));
let hit_ref = hit.clone();
let _sub = doc
.subscribe_jsonpath(
"$.books[?@.available].title",
Arc::new(move || {
hit_ref.fetch_add(1, Ordering::SeqCst);
}),
)
.unwrap();
second_book.insert("available", true).unwrap();
doc.commit_then_renew();
assert!(hit.load(Ordering::SeqCst) >= 1);
}
#[test]
fn jsonpath_subscribe_triggers_once_per_commit() {
let doc = LoroDoc::new_auto_commit();
let book = make_book(&doc, 0, "A", true, 10);
doc.commit_then_renew();
let hit = Arc::new(AtomicUsize::new(0));
let hit_ref = hit.clone();
let _sub = doc
.subscribe_jsonpath(
"$.books[0].title",
Arc::new(move || {
hit_ref.fetch_add(1, Ordering::SeqCst);
}),
)
.unwrap();
book.insert("title", "X").unwrap();
book.insert("title", "Y").unwrap();
doc.commit_then_renew();
assert_eq!(hit.load(Ordering::SeqCst), 1);
}
}