1use crate::{
2 error::Error,
3 key::{concat, concat_sep, encode_replace_key, u16_to_ver, u64_to_ver, IndexKey},
4 ArchivedEventIndex, Event, EventIndex, Filter, FromEventData, Stats,
5};
6use nostr_kv::{
7 lmdb::{Db as Lmdb, Iter as LmdbIter, *},
8 scanner::{Group, GroupItem, MatchResult, Scanner},
9};
10
11use std::{
12 marker::PhantomData,
13 ops::Bound,
14 path::Path,
15 sync::{
16 atomic::{AtomicU64, Ordering},
17 Arc,
18 },
19 time::{Duration, Instant},
20};
21
22type Result<T, E = Error> = core::result::Result<T, E>;
23
24pub fn upper(mut key: Vec<u8>) -> Option<Vec<u8>> {
25 key.iter().rposition(|&x| x < u8::MAX).map(|position| {
26 key[position] += 1;
27 key.truncate(position + 1);
28 key
29 })
30}
31
32const MAX_TAG_VALUE_SIZE: usize = 255;
33const DB_VERSION: &str = "3";
34
35#[derive(Clone)]
36pub struct Db {
37 inner: Lmdb,
38 #[allow(unused)]
39 t_meta: Tree,
41 t_data: Tree,
43 t_index: Tree,
45 t_id_uid: Tree,
47 t_uid_word: Tree,
49 t_id: Tree,
51 t_pubkey: Tree,
53 t_kind: Tree,
55 t_pubkey_kind: Tree,
56 t_created_at: Tree,
57 t_tag: Tree,
58 t_deletion: Tree,
59 t_replacement: Tree,
60 t_expiration: Tree,
61 t_word: Tree,
63 seq: Arc<AtomicU64>,
64}
65
66fn u64_from_bytes(bytes: &[u8]) -> Result<u64, Error> {
67 Ok(u64::from_be_bytes(bytes.try_into()?))
68}
69
70fn u16_from_bytes(bytes: &[u8]) -> Result<u16, Error> {
71 Ok(u16::from_be_bytes(bytes.try_into()?))
72}
73
74fn latest_seq(db: &Lmdb, tree: &Tree) -> Result<u64, Error> {
76 let txn = db.reader()?;
77 let mut iter = txn.iter_from(tree, Bound::Unbounded::<Vec<u8>>, true);
78 if let Some(item) = iter.next() {
79 let (k, _) = item?;
80 u64_from_bytes(k)
81 } else {
82 Ok(0)
83 }
84}
85
86#[cfg(feature = "zstd")]
87fn encode_event(event: &Event) -> Result<Vec<u8>> {
88 let json = event.to_json()?;
89 let mut json = zstd::encode_all(json.as_bytes(), 5).map_err(Error::Io)?;
90 json.push(1);
91 Ok(json)
92}
93#[cfg(not(feature = "zstd"))]
94fn encode_event(event: &Event) -> Result<String> {
95 event.to_json()
96}
97
98impl Db {
99 fn del_event(&self, writer: &mut Writer, event: &Event, uid: &[u8]) -> Result<(), Error> {
100 let index_event = event.index();
101 let time = index_event.created_at();
102 let kind = index_event.kind();
103 let pubkey = index_event.pubkey();
104
105 let bytes = writer.get(&self.t_uid_word, uid)?;
107 if let Some(bytes) = bytes {
108 let bytes = bytes.to_vec();
109 writer.del(&self.t_uid_word, uid, None)?;
110 let word = unsafe { rkyv::archived_root::<Vec<Vec<u8>>>(&bytes) };
111 for item in word.as_slice() {
112 writer.del(&self.t_word, IndexKey::encode_word(item, time), Some(uid))?;
113 }
114 }
115
116 writer.del(&self.t_data, uid, None)?;
117 writer.del(&self.t_index, uid, None)?;
118 writer.del(&self.t_id_uid, index_event.id(), None)?;
119
120 writer.del(
121 &self.t_id,
122 IndexKey::encode_id(index_event.id(), time),
123 Some(uid),
124 )?;
125
126 writer.del(&self.t_kind, IndexKey::encode_kind(kind, time), Some(uid))?;
127
128 writer.del(
129 &self.t_pubkey,
130 IndexKey::encode_pubkey(pubkey, time),
131 Some(uid),
132 )?;
133 writer.del(
134 &self.t_pubkey_kind,
135 IndexKey::encode_pubkey_kind(pubkey, kind, time),
136 Some(uid),
137 )?;
138
139 if let Some(delegator) = index_event.delegator() {
140 writer.del(
141 &self.t_pubkey,
142 IndexKey::encode_pubkey(delegator, time),
143 Some(uid),
144 )?;
145 writer.del(
146 &self.t_pubkey_kind,
147 IndexKey::encode_pubkey_kind(delegator, kind, time),
148 Some(uid),
149 )?;
150 }
151
152 writer.del(&self.t_created_at, IndexKey::encode_time(time), Some(uid))?;
153
154 let tagval = concat(uid, kind.to_be_bytes());
155 for tag in index_event.tags() {
156 writer.del(
157 &self.t_tag,
158 IndexKey::encode_tag(&tag.0, &tag.1, time),
159 Some(&tagval),
160 )?;
161 }
162
163 if let Some(k) = encode_replace_key(index_event.kind(), index_event.pubkey(), event.tags())
165 {
166 writer.del(&self.t_replacement, k, None)?;
167 }
168
169 if let Some(t) = index_event.expiration() {
171 writer.del(&self.t_expiration, IndexKey::encode_time(*t), Some(uid))?;
172 }
173
174 Ok(())
175 }
176
177 fn put_event(
178 &self,
179 writer: &mut Writer,
180 event: &Event,
181 uid: &Vec<u8>,
182 replace_key: &Option<Vec<u8>>,
183 ) -> Result<(), Error> {
184 let index_event = event.index();
185
186 let time = index_event.created_at();
188 let json = encode_event(event)?;
189
190 writer.put(&self.t_data, uid, json)?;
191
192 let bytes = index_event.to_bytes()?;
194 writer.put(&self.t_index, uid, bytes)?;
195
196 let kind = index_event.kind();
198 let pubkey = index_event.pubkey();
199
200 writer.put(&self.t_id_uid, index_event.id(), uid)?;
201
202 writer.put(&self.t_id, IndexKey::encode_id(index_event.id(), time), uid)?;
203
204 writer.put(&self.t_kind, IndexKey::encode_kind(kind, time), uid)?;
205
206 writer.put(&self.t_pubkey, IndexKey::encode_pubkey(pubkey, time), uid)?;
207 writer.put(
208 &self.t_pubkey_kind,
209 IndexKey::encode_pubkey_kind(pubkey, kind, time),
210 uid,
211 )?;
212
213 if let Some(delegator) = index_event.delegator() {
214 writer.put(
215 &self.t_pubkey,
216 IndexKey::encode_pubkey(delegator, time),
217 uid,
218 )?;
219 writer.put(
220 &self.t_pubkey_kind,
221 IndexKey::encode_pubkey_kind(delegator, kind, time),
222 uid,
223 )?;
224 }
225
226 writer.put(&self.t_created_at, IndexKey::encode_time(time), uid)?;
227
228 let tagval = concat(uid, kind.to_be_bytes());
229 for tag in index_event.tags() {
230 let key = &tag.0;
231 let v = &tag.1;
232 if kind == 5 && key[0] == 101 {
234 writer.put(&self.t_deletion, concat(index_event.id(), v), uid)?;
235 }
236 writer.put(&self.t_tag, IndexKey::encode_tag(key, v, time), &tagval)?;
238 }
239
240 if let Some(k) = replace_key {
242 writer.put(&self.t_replacement, k, uid)?;
244 }
245
246 if let Some(t) = index_event.expiration() {
248 writer.put(&self.t_expiration, IndexKey::encode_time(*t), uid)?;
249 }
250
251 let words = &event.words;
253 if !words.is_empty() {
254 let bytes =
255 rkyv::to_bytes::<_, 256>(words).map_err(|e| Error::Serialization(e.to_string()))?;
256 writer.put(&self.t_uid_word, uid, bytes)?;
257 for item in words {
258 writer.put(&self.t_word, IndexKey::encode_word(item, time), uid)?;
259 }
260 }
261 Ok(())
262 }
263}
264
265fn get_event<R: FromEventData, K: AsRef<[u8]>, T: Transaction>(
266 reader: &T,
267 id_tree: &Tree,
268 data_tree: &Tree,
269 index_tree: &Tree,
270 event_id: K,
271) -> Result<Option<(Vec<u8>, R)>, Error> {
272 let uid = get_uid(reader, id_tree, event_id)?;
273 if let Some(uid) = uid {
274 let event = get_event_by_uid(reader, data_tree, index_tree, &uid)?;
275 if let Some(event) = event {
276 return Ok(Some((uid, event)));
277 }
278 }
279 Ok(None)
280}
281
282fn get_event_by_uid<R: FromEventData, K: AsRef<[u8]>, T: Transaction>(
283 reader: &T,
284 data_tree: &Tree,
285 index_tree: &Tree,
286 uid: K,
287) -> Result<Option<R>, Error> {
288 if R::only_id() {
289 let v = reader.get(index_tree, uid)?;
291 let event = decode_event_index(v)?;
292 if let Some(v) = event {
293 return Ok(Some(
294 R::from_data(v.id()).map_err(|e| Error::Message(e.to_string()))?,
295 ));
296 }
297 } else {
298 let v = reader.get(data_tree, uid)?;
299 if let Some(v) = v {
300 return Ok(Some(
301 R::from_data(v).map_err(|e| Error::Message(e.to_string()))?,
302 ));
303 }
304 }
305 Ok(None)
306}
307
308fn decode_event_index(v: Option<&[u8]>) -> Result<Option<&ArchivedEventIndex>, Error> {
309 if let Some(v) = v {
310 return Ok(Some(EventIndex::from_zeroes(v)?));
311 }
312 Ok(None)
313}
314
315fn get_uid<K: AsRef<[u8]>, T: Transaction>(
316 reader: &T,
317 id_tree: &Tree,
318 event_id: K,
319) -> Result<Option<Vec<u8>>, Error> {
320 Ok(reader.get(id_tree, event_id)?.map(|v| v.to_vec()))
321}
322
323#[derive(Debug, Clone)]
324pub enum CheckEventResult {
325 Invald(String),
326 Duplicate,
327 Deleted,
328 ReplaceIgnored,
329 Ok(usize),
330}
331
332impl Db {
333 pub fn flush(&self) -> Result<()> {
334 self.inner.flush()?;
335 Ok(())
336 }
337
338 pub fn check_schema(&self) -> Result<()> {
340 let mut writer = self.inner.writer()?;
341 let old = writer.get(&self.t_meta, "version")?;
342 if let Some(old) = old {
343 if old != DB_VERSION.as_bytes() {
344 return Err(Error::VersionMismatch);
345 }
346 } else {
347 writer.put(&self.t_meta, "version", DB_VERSION)?;
348 }
349 writer.commit()?;
350 Ok(())
351 }
352
353 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
354 let inner = Lmdb::open_with(path, Some(20), Some(100), Some(1_000_000_000_000), 0)?;
355
356 let default_opts = 0;
357 let integer_default_opts = 0;
359
360 let index_opts = ffi::MDB_DUPSORT | ffi::MDB_DUPFIXED;
362
363 let integer_index_opts = ffi::MDB_DUPSORT | ffi::MDB_DUPFIXED;
367
368 let t_data = inner.open_tree(Some("t_data"), integer_default_opts)?;
369 let t_meta = inner.open_tree(Some("t_meta"), default_opts)?;
370
371 Ok(Self {
372 seq: Arc::new(AtomicU64::new(latest_seq(&inner, &t_data)?)),
373 t_data,
374 t_meta,
375 t_index: inner.open_tree(Some("t_index"), integer_default_opts)?,
376 t_id_uid: inner.open_tree(Some("t_id_uid"), default_opts)?,
377 t_uid_word: inner.open_tree(Some("t_uid_word"), default_opts)?,
378 t_deletion: inner.open_tree(Some("t_deletion"), default_opts)?,
379 t_replacement: inner.open_tree(Some("t_replacement"), default_opts)?,
380 t_id: inner.open_tree(Some("t_id"), default_opts)?,
381 t_pubkey: inner.open_tree(Some("t_pubkey"), index_opts)?,
382 t_kind: inner.open_tree(Some("t_kind"), index_opts)?,
383 t_pubkey_kind: inner.open_tree(Some("t_pubkey_kind"), index_opts)?,
384 t_created_at: inner.open_tree(Some("t_created_at"), integer_index_opts)?,
385 t_tag: inner.open_tree(Some("t_tag"), ffi::MDB_DUPSORT | ffi::MDB_DUPFIXED)?,
386 t_expiration: inner.open_tree(Some("t_expiration"), integer_index_opts)?,
387 t_word: inner.open_tree(Some("t_word"), index_opts)?,
388
389 inner,
390 })
391 }
392
393 pub fn writer(&self) -> Result<Writer> {
394 Ok(self.inner.writer()?)
395 }
396
397 pub fn reader(&self) -> Result<Reader> {
398 Ok(self.inner.reader()?)
399 }
400
401 pub fn commit<T: Transaction>(&self, txn: T) -> Result<()> {
402 Ok(txn.commit()?)
403 }
404
405 pub fn put<E: AsRef<Event>>(&self, writer: &mut Writer, event: E) -> Result<CheckEventResult> {
406 let event = event.as_ref();
407 let mut count = 0;
408
409 if event.id().len() != 32 || event.pubkey().len() != 32 {
410 return Ok(CheckEventResult::Invald(
411 "invalid event id or pubkey".to_owned(),
412 ));
413 }
414 let event_id = event.id();
416 let pubkey = event.pubkey();
417
418 {
420 if get_uid(writer, &self.t_id_uid, event_id)?.is_some() {
422 return Ok(CheckEventResult::Duplicate);
423 }
424 }
425
426 if writer
428 .get(&self.t_deletion, concat(event_id, pubkey))?
429 .is_some()
430 {
431 return Ok(CheckEventResult::Deleted);
432 }
433
434 if event.kind() == 5 {
437 for tag in event.index().tags() {
438 if tag.0 == b"e" {
439 let key = &tag.1;
441 let r = get_event::<Event, _, _>(
442 writer,
443 &self.t_id_uid,
444 &self.t_data,
445 &self.t_index,
446 key,
447 )?;
448 if let Some((uid, e)) = r {
449 if (e.pubkey() == event.pubkey()
452 || e.index().delegator() == Some(event.pubkey()))
453 && e.kind() != 5
454 {
455 count += 1;
456 self.del_event(writer, &e, &uid)?;
457 }
458 }
459 }
460 }
461 }
462
463 let replace_key = encode_replace_key(event.kind(), event.pubkey(), event.tags());
465
466 if let Some(replace_key) = replace_key.as_ref() {
467 if replace_key.len() > MAX_TAG_VALUE_SIZE + 8 + 32 {
470 return Ok(CheckEventResult::Invald("invalid replace key".to_owned()));
471 }
472
473 let v = writer.get(&self.t_replacement, replace_key)?;
475 if let Some(v) = v {
476 let uid = v.to_vec();
477 let e: Option<Event> = get_event_by_uid(writer, &self.t_data, &self.t_index, &uid)?;
483 if let Some(e) = e {
484 if event.created_at() < e.created_at()
486 || (event.created_at() == e.created_at() && event.id() > e.id())
487 {
488 return Ok(CheckEventResult::ReplaceIgnored);
489 }
490 count += 1;
492 self.del_event(writer, &e, &uid)?;
493 }
494 }
495 }
496
497 count += 1;
498
499 let seq = self.seq.fetch_add(1, Ordering::SeqCst);
500 let seq = u64_to_ver(seq);
501 self.put_event(writer, event, &seq, &replace_key)?;
502 Ok(CheckEventResult::Ok(count))
503 }
504
505 pub fn get<R: FromEventData, K: AsRef<[u8]>, T: Transaction>(
506 &self,
507 txn: &T,
508 event_id: K,
509 ) -> Result<Option<R>> {
510 let event = get_event(txn, &self.t_id_uid, &self.t_data, &self.t_index, event_id)?;
511 Ok(event.map(|e| e.1))
512 }
513
514 pub fn del<K: AsRef<[u8]>>(&self, writer: &mut Writer, event_id: K) -> Result<bool> {
515 if let Some((uid, event)) = get_event::<Event, _, _>(
516 writer,
517 &self.t_id_uid,
518 &self.t_data,
519 &self.t_index,
520 event_id,
521 )? {
522 self.del_event(writer, &event, &uid)?;
523 Ok(true)
524 } else {
525 Ok(false)
526 }
527 }
528
529 pub fn batch_put<II, N>(&self, events: II) -> Result<usize>
530 where
531 II: IntoIterator<Item = N>,
532 N: AsRef<Event>,
533 {
534 let mut writer = self.inner.writer()?;
535 let mut events = events.into_iter().collect::<Vec<N>>();
536
537 events.sort_by(|a, b| a.as_ref().id().cmp(b.as_ref().id()));
539 let mut count = 0;
540
541 for (i, event) in events.iter().enumerate() {
542 let event = event.as_ref();
543 if i != 0 && event.id() == events[i - 1].as_ref().id() {
545 continue;
546 }
547 if let CheckEventResult::Ok(c) = self.put(&mut writer, event)? {
548 count += c;
549 }
550 }
551
552 writer.commit()?;
553 Ok(count)
554 }
555
556 pub fn batch_get<R: FromEventData, II, N>(&self, event_ids: II) -> Result<Vec<R>>
557 where
558 II: IntoIterator<Item = N>,
559 N: AsRef<[u8]>,
560 {
561 let reader = self.reader()?;
562 let mut events = vec![];
563 for id in event_ids.into_iter() {
564 let r = self.get::<R, _, _>(&reader, &id)?;
565 if let Some(e) = r {
566 events.push(e);
567 }
568 }
569 Ok(events)
570 }
571
572 pub fn batch_del<II, N>(&self, event_ids: II) -> Result<()>
573 where
574 II: IntoIterator<Item = N>,
575 N: AsRef<[u8]>,
576 {
577 let mut writer = self.inner.writer()?;
578 for id in event_ids.into_iter() {
579 self.del(&mut writer, &id)?;
580 }
581 writer.commit()?;
582 Ok(())
583 }
584
585 pub fn iter<'txn, J: FromEventData, T: Transaction>(
587 &self,
588 txn: &'txn T,
589 filter: &Filter,
590 ) -> Result<Iter<'txn, T, J>> {
591 if filter.search.as_ref().is_some() {
592 let match_index = if !filter.ids.is_empty()
593 || !filter.tags.is_empty()
594 || !filter.authors.is_empty()
595 || !filter.kinds.is_empty()
596 {
597 MatchIndex::All
598 } else {
599 MatchIndex::None
600 };
601 Iter::new_word(self, txn, filter, &self.t_word, match_index)
602 } else if !filter.ids.is_empty() {
603 let match_index = if !filter.tags.is_empty()
604 || !filter.authors.is_empty()
605 || !filter.kinds.is_empty()
606 {
607 MatchIndex::All
608 } else {
609 MatchIndex::None
610 };
611 Iter::new_prefix(self, txn, filter, &filter.ids, &self.t_id, match_index)
612 } else if !filter.tags.is_empty() {
613 let match_index = if !filter.authors.is_empty() {
614 MatchIndex::Pubkey
615 } else {
616 MatchIndex::None
617 };
618 Iter::new_tag(self, txn, filter, &self.t_tag, match_index)
619 } else if !filter.authors.is_empty() && !filter.kinds.is_empty() {
620 Iter::new_author_kind(self, txn, filter, &self.t_pubkey_kind, MatchIndex::None)
621 } else if !filter.authors.is_empty() {
622 Iter::new_prefix(
623 self,
624 txn,
625 filter,
626 &filter.authors,
627 &self.t_pubkey,
628 MatchIndex::None,
629 )
630 } else if !filter.kinds.is_empty() {
631 Iter::new_kind(self, txn, filter, &self.t_kind, MatchIndex::None)
632 } else {
633 Iter::new_time(self, txn, filter, &self.t_created_at, MatchIndex::None)
634 }
635 }
636
637 pub fn iter_expiration<'txn, J: FromEventData, T: Transaction>(
639 &self,
640 txn: &'txn T,
641 until: Option<u64>,
642 ) -> Result<Iter<'txn, T, J>> {
643 let filter = Filter {
644 desc: true,
645 until,
646 ..Default::default()
647 };
648 Iter::new_time(self, txn, &filter, &self.t_expiration, MatchIndex::None)
649 }
650
651 pub fn iter_ephemeral<'txn, J: FromEventData, T: Transaction>(
653 &self,
654 txn: &'txn T,
655 until: Option<u64>,
656 ) -> Result<Iter<'txn, T, J>> {
657 let filter = Filter {
658 desc: false,
659 until,
660 ..Default::default()
661 };
662 let mut group = Group::new(filter.desc, false, false);
663 let prefix = u16_to_ver(20000);
664 let end = u16_to_ver(30000);
665
666 let iter = create_iter(txn, &self.t_kind, &prefix, filter.desc);
667 let scanner = Scanner::new(
668 iter,
669 vec![],
670 prefix,
671 filter.desc,
672 filter.since,
673 filter.until,
674 Box::new(move |_s, r| {
675 let k = r.0;
676 let e: &[u8] = end.as_ref();
677 Ok(if k < e {
678 MatchResult::Found(IndexKey::from(k, r.1)?)
679 } else {
680 MatchResult::Stop
681 })
682 }),
683 );
684 group.add(Box::new(scanner))?;
685 Iter::new(self, txn, &filter, group, MatchIndex::None)
686 }
687}
688
689#[derive(Debug)]
698enum MatchIndex {
699 All,
700 Pubkey,
701 None,
702}
703
704impl MatchIndex {
705 fn r#match(&self, filter: &Filter, event: &ArchivedEventIndex) -> bool {
706 match &self {
707 MatchIndex::Pubkey => {
708 Filter::match_author(&filter.authors, event.pubkey(), event.delegator())
709 }
710 _ => filter.match_archived(event),
711 }
712 }
713}
714
715pub struct Iter<'txn, R, J>
716where
717 R: Transaction,
718{
719 reader: &'txn R,
720 view_data: Tree,
721 view_index: Tree,
722 group: Group<'txn, IndexKey, Error>,
723 get_data: u64,
724 get_index: u64,
725 filter: Filter,
726 _r: PhantomData<J>,
728 match_index: MatchIndex,
730}
731
732fn create_iter<'a, R: Transaction>(
733 reader: &'a R,
734 tree: &Tree,
735 prefix: &Vec<u8>,
736 reverse: bool,
737) -> LmdbIter<'a> {
738 if reverse {
739 let start = upper(prefix.clone())
740 .map(Bound::Excluded)
741 .unwrap_or(Bound::Unbounded);
742 reader.iter_from(tree, start, true)
743 } else {
744 reader.iter_from(tree, Bound::Included(prefix), false)
745 }
746}
747
748impl<'txn, R, J> Iter<'txn, R, J>
749where
750 R: Transaction,
751 J: FromEventData,
752{
753 fn new(
754 kv_db: &Db,
755 reader: &'txn R,
756 filter: &Filter,
757 group: Group<'txn, IndexKey, Error>,
758 match_index: MatchIndex,
759 ) -> Result<Self, Error> {
760 Ok(Self {
761 view_data: kv_db.t_data.clone(),
762 view_index: kv_db.t_index.clone(),
763 reader,
764 group,
765 get_data: 0,
766 get_index: 0,
767 filter: filter.clone(),
768 _r: PhantomData,
770 match_index,
771 })
772 }
773
774 fn new_time(
776 kv_db: &Db,
777 reader: &'txn R,
778 filter: &Filter,
779 view: &Tree,
780 match_index: MatchIndex,
781 ) -> Result<Self, Error> {
782 let mut group = Group::new(filter.desc, false, false);
783 let prefix = if filter.desc {
784 (u64::MAX - 1).to_be_bytes()
785 } else {
786 0u64.to_be_bytes()
787 }
788 .to_vec();
789 let iter = create_iter(reader, view, &prefix, filter.desc);
790 let scanner = Scanner::new(
791 iter,
792 vec![],
793 prefix,
794 filter.desc,
795 filter.since,
796 filter.until,
797 Box::new(|_, r| Ok(MatchResult::Found(IndexKey::from(r.0, r.1)?))),
798 );
799 group.add(Box::new(scanner))?;
800 Self::new(kv_db, reader, filter, group, match_index)
801 }
802
803 fn new_kind(
804 kv_db: &Db,
805 reader: &'txn R,
806 filter: &Filter,
807 view: &Tree,
808 match_index: MatchIndex,
809 ) -> Result<Self, Error> {
810 let mut group = Group::new(filter.desc, false, false);
811 for kind in filter.kinds.iter() {
812 let prefix = u16_to_ver(*kind);
813 let iter = create_iter(reader, view, &prefix, filter.desc);
814 let scanner = Scanner::new(
815 iter,
816 vec![],
817 prefix,
818 filter.desc,
819 filter.since,
820 filter.until,
821 Box::new(|s, r| {
822 let k = r.0;
823 Ok(if k.starts_with(&s.prefix) {
824 MatchResult::Found(IndexKey::from(k, r.1)?)
825 } else {
826 MatchResult::Stop
827 })
828 }),
829 );
830 group.add(Box::new(scanner))?;
831 }
832 Self::new(kv_db, reader, filter, group, match_index)
833 }
834
835 fn new_tag(
836 kv_db: &Db,
837 reader: &'txn R,
838 filter: &Filter,
839 view: &Tree,
840 match_index: MatchIndex,
841 ) -> Result<Self, Error> {
842 let mut group = Group::new(filter.desc, true, false);
843 let has_kind = !filter.kinds.is_empty();
844
845 for tag in filter.tags.iter() {
846 let mut sub = Group::new(filter.desc, false, true);
847 for key in tag.1.iter() {
848 let kinds = filter.kinds.clone();
849 let prefix = concat_sep(concat_sep(tag.0, key), vec![]);
853 let klen = prefix.len() + 8;
854 let iter = create_iter(reader, view, &prefix, filter.desc);
855
856 let scanner = Scanner::new(
857 iter,
858 vec![],
859 prefix,
860 filter.desc,
861 filter.since,
862 filter.until,
863 Box::new(move |s, r| {
864 let k = r.0;
865 let v = r.1;
866 Ok(if k.len() == klen && k.starts_with(&s.prefix) {
867 if has_kind && !Filter::match_kind(&kinds, u16_from_bytes(&v[8..10])?) {
869 MatchResult::Continue
870 } else {
871 MatchResult::Found(IndexKey::from(k, v)?)
872 }
873 } else {
874 MatchResult::Stop
875 })
876 }),
877 );
878 sub.add(Box::new(scanner))?;
879 }
880 group.add(Box::new(sub))?;
881 }
882 Self::new(kv_db, reader, filter, group, match_index)
883 }
884
885 fn new_author_kind(
886 kv_db: &Db,
887 reader: &'txn R,
888 filter: &Filter,
889 view: &Tree,
890 match_index: MatchIndex,
891 ) -> Result<Self, Error> {
892 let mut group = Group::new(filter.desc, false, false);
893
894 for author in filter.authors.iter() {
895 for kind in filter.kinds.iter() {
896 let prefix: Vec<u8> = concat(author, u16_to_ver(*kind));
897 let iter = create_iter(reader, view, &prefix, filter.desc);
898 let scanner = Scanner::new(
899 iter,
900 author.to_vec(),
901 prefix,
902 filter.desc,
903 filter.since,
904 filter.until,
905 Box::new(|s, r| {
906 Ok(if r.0.starts_with(&s.prefix) {
907 MatchResult::Found(IndexKey::from(r.0, r.1)?)
908 } else {
909 MatchResult::Stop
910 })
911 }),
912 );
913 group.add(Box::new(scanner))?;
914 }
915 }
916
917 Self::new(kv_db, reader, filter, group, match_index)
918 }
919
920 fn new_prefix(
921 kv_db: &Db,
922 reader: &'txn R,
923 filter: &Filter,
924 ids: &[[u8; 32]],
925 view: &Tree,
926 match_index: MatchIndex,
927 ) -> Result<Self, Error> {
928 let mut group = Group::new(filter.desc, false, false);
929
930 for id in ids.iter() {
931 let prefix = id.to_vec();
932 let iter = create_iter(reader, view, &prefix, filter.desc);
933 let scanner = Scanner::new(
934 iter,
935 prefix.clone(),
936 prefix,
937 filter.desc,
938 filter.since,
939 filter.until,
940 Box::new(move |s, r| {
941 Ok(if r.0.starts_with(&s.prefix) {
942 MatchResult::Found(IndexKey::from(r.0, r.1)?)
943 } else {
944 MatchResult::Stop
945 })
946 }),
947 );
948 group.add(Box::new(scanner))?;
949 }
950 Self::new(kv_db, reader, filter, group, match_index)
951 }
952
953 fn new_word(
954 kv_db: &Db,
955 reader: &'txn R,
956 filter: &Filter,
957 view: &Tree,
958 match_index: MatchIndex,
959 ) -> Result<Self, Error> {
960 let mut group = Group::new(filter.desc, true, true);
961 for word in filter.words.iter() {
962 let prefix = concat_sep(word, []);
963 let klen = prefix.len() + 8;
964 let iter = create_iter(reader, view, &prefix, filter.desc);
965 let scanner = Scanner::new(
966 iter,
967 vec![],
968 prefix,
969 filter.desc,
970 filter.since,
971 filter.until,
972 Box::new(move |s, r| {
973 let k = r.0;
974 Ok(if k.len() == klen && k.starts_with(&s.prefix) {
975 MatchResult::Found(IndexKey::from(k, r.1)?)
976 } else {
977 MatchResult::Stop
978 })
979 }),
980 );
981 group.add(Box::new(scanner))?;
982 }
983 Self::new(kv_db, reader, filter, group, match_index)
984 }
985
986 fn document(&self, key: &IndexKey) -> Result<Option<J>, Error> {
987 get_event_by_uid::<J, _, _>(
988 self.reader,
989 &self.view_data,
990 &self.view_index,
991 key.uid().to_be_bytes(),
992 )
993 }
994
995 fn index_data(&self, key: &IndexKey) -> Result<Option<&'txn [u8]>, Error> {
996 let v = self.reader.get(&self.view_index, key.uid().to_be_bytes())?;
997 Ok(v)
998 }
999
1000 fn limit(&self, num: u64) -> bool {
1001 if let Some(limit) = self.filter.limit {
1002 num >= limit
1003 } else {
1004 false
1005 }
1006 }
1007
1008 fn next_inner(&mut self) -> Result<Option<J>, Error> {
1009 while let Some(item) = self.group.next() {
1010 let key = item?;
1011 if matches!(self.match_index, MatchIndex::None) {
1012 self.get_data += 1;
1013 if let Some(event) = self.document(&key)? {
1014 return Ok(Some(event));
1015 }
1016 } else {
1017 let data = self.index_data(&key)?;
1018 let event = decode_event_index(data)?;
1019 self.get_index += 1;
1020 if let Some(event) = event {
1021 if self.match_index.r#match(&self.filter, event) {
1022 self.get_data += 1;
1023 if let Some(event) = self.document(&key)? {
1024 return Ok(Some(event));
1025 }
1026 }
1027 }
1028 }
1029 }
1030 Ok(None)
1031 }
1032}
1033
1034impl<'txn, R, J> Iter<'txn, R, J>
1035where
1036 R: Transaction,
1037 J: FromEventData,
1038{
1039 pub fn scan_time(&mut self, timeout: Duration, check_step: u64) {
1041 let start = Instant::now();
1042 let mut last = check_step;
1043 self.group.watcher(Box::new(move |count| {
1044 if count > last {
1045 if start.elapsed() > timeout {
1047 return Err(Error::ScanTimeout);
1048 }
1049 last = count + check_step;
1050 }
1051 Ok(())
1052 }));
1053 }
1054
1055 pub fn stats(&self) -> Stats {
1057 Stats {
1058 scan_index: self.group.scan_times,
1059 get_data: self.get_data,
1060 get_index: self.get_index,
1061 }
1062 }
1063
1064 pub fn size(mut self) -> Result<(u64, Stats)> {
1066 let mut len = 0;
1067 while let Some(item) = self.group.next() {
1068 let key = item?;
1069 if matches!(self.match_index, MatchIndex::None) {
1070 len += 1;
1071 if self.limit(len) {
1072 break;
1073 }
1074 } else {
1075 let data = self.index_data(&key)?;
1076 let event = decode_event_index(data)?;
1077 self.get_index += 1;
1078 if let Some(event) = event {
1079 if self.match_index.r#match(&self.filter, event) {
1080 len += 1;
1081 if self.limit(len) {
1082 break;
1083 }
1084 }
1085 }
1086 }
1087 }
1088 Ok((
1089 len,
1090 Stats {
1091 get_data: 0,
1092 get_index: self.get_index,
1093 scan_index: self.group.scan_times,
1094 },
1095 ))
1096 }
1097}
1098
1099impl<'txn, R, J> Iterator for Iter<'txn, R, J>
1100where
1101 R: Transaction,
1102 J: FromEventData,
1103{
1104 type Item = Result<J, Error>;
1105 fn next(&mut self) -> Option<Self::Item> {
1106 if self.limit(self.get_data) {
1107 None
1108 } else {
1109 self.next_inner().transpose()
1110 }
1111 }
1112}
1113
1114#[cfg(test)]
1115mod tests {
1116 use super::upper;
1117
1118 #[test]
1119 pub fn test_upper_fn() {
1120 assert_eq!(upper(vec![1, 2, 3, 4, 5]), Some(vec![1, 2, 3, 4, 6]));
1121 assert_eq!(upper(vec![1, 2, 3, 4, 255]), Some(vec![1, 2, 3, 5]));
1122 assert_eq!(upper(vec![1, 2, 3, 255, 255]), Some(vec![1, 2, 4]));
1123 assert_eq!(upper(vec![1, 2, 255, 255, 255]), Some(vec![1, 3]));
1124 assert_eq!(upper(vec![1, 255, 255, 255, 255]), Some(vec![2]));
1125 assert_eq!(upper(vec![255, 255, 255, 255, 255]), None);
1126 assert_eq!(upper(vec![1, 2, 3, 255, 5]), Some(vec![1, 2, 3, 255, 6]));
1127 assert_eq!(upper(vec![255, 2, 3, 4, 5]), Some(vec![255, 2, 3, 4, 6]));
1128 }
1129}