nostr_db/
db.rs

1use crate::{
2    error::Error,
3    key::{concat, concat_sep, encode_replace_key, u16_to_ver, u64_to_ver, IndexKey},
4    ArchivedEventIndex, Event, EventIndex, Filter, FromEventData, Stats,
5};
6use nostr_kv::{
7    lmdb::{Db as Lmdb, Iter as LmdbIter, *},
8    scanner::{Group, GroupItem, MatchResult, Scanner},
9};
10
11use std::{
12    marker::PhantomData,
13    ops::Bound,
14    path::Path,
15    sync::{
16        atomic::{AtomicU64, Ordering},
17        Arc,
18    },
19    time::{Duration, Instant},
20};
21
22type Result<T, E = Error> = core::result::Result<T, E>;
23
24pub fn upper(mut key: Vec<u8>) -> Option<Vec<u8>> {
25    key.iter().rposition(|&x| x < u8::MAX).map(|position| {
26        key[position] += 1;
27        key.truncate(position + 1);
28        key
29    })
30}
31
32const MAX_TAG_VALUE_SIZE: usize = 255;
33const DB_VERSION: &str = "3";
34
35#[derive(Clone)]
36pub struct Db {
37    inner: Lmdb,
38    #[allow(unused)]
39    // save meta data
40    t_meta: Tree,
41    // save data
42    t_data: Tree,
43    // save index
44    t_index: Tree,
45    // map id to uid
46    t_id_uid: Tree,
47    // map id to word
48    t_uid_word: Tree,
49    // id time
50    t_id: Tree,
51    // pubkey time
52    t_pubkey: Tree,
53    // kind time
54    t_kind: Tree,
55    t_pubkey_kind: Tree,
56    t_created_at: Tree,
57    t_tag: Tree,
58    t_deletion: Tree,
59    t_replacement: Tree,
60    t_expiration: Tree,
61    // word time
62    t_word: Tree,
63    seq: Arc<AtomicU64>,
64}
65
66fn u64_from_bytes(bytes: &[u8]) -> Result<u64, Error> {
67    Ok(u64::from_be_bytes(bytes.try_into()?))
68}
69
70fn u16_from_bytes(bytes: &[u8]) -> Result<u16, Error> {
71    Ok(u16::from_be_bytes(bytes.try_into()?))
72}
73
74// Get the latest seq from db
75fn latest_seq(db: &Lmdb, tree: &Tree) -> Result<u64, Error> {
76    let txn = db.reader()?;
77    let mut iter = txn.iter_from(tree, Bound::Unbounded::<Vec<u8>>, true);
78    if let Some(item) = iter.next() {
79        let (k, _) = item?;
80        u64_from_bytes(k)
81    } else {
82        Ok(0)
83    }
84}
85
86#[cfg(feature = "zstd")]
87fn encode_event(event: &Event) -> Result<Vec<u8>> {
88    let json = event.to_json()?;
89    let mut json = zstd::encode_all(json.as_bytes(), 5).map_err(Error::Io)?;
90    json.push(1);
91    Ok(json)
92}
93#[cfg(not(feature = "zstd"))]
94fn encode_event(event: &Event) -> Result<String> {
95    event.to_json()
96}
97
98impl Db {
99    fn del_event(&self, writer: &mut Writer, event: &Event, uid: &[u8]) -> Result<(), Error> {
100        let index_event = event.index();
101        let time = index_event.created_at();
102        let kind = index_event.kind();
103        let pubkey = index_event.pubkey();
104
105        // word
106        let bytes = writer.get(&self.t_uid_word, uid)?;
107        if let Some(bytes) = bytes {
108            let bytes = bytes.to_vec();
109            writer.del(&self.t_uid_word, uid, None)?;
110            let word = unsafe { rkyv::archived_root::<Vec<Vec<u8>>>(&bytes) };
111            for item in word.as_slice() {
112                writer.del(&self.t_word, IndexKey::encode_word(item, time), Some(uid))?;
113            }
114        }
115
116        writer.del(&self.t_data, uid, None)?;
117        writer.del(&self.t_index, uid, None)?;
118        writer.del(&self.t_id_uid, index_event.id(), None)?;
119
120        writer.del(
121            &self.t_id,
122            IndexKey::encode_id(index_event.id(), time),
123            Some(uid),
124        )?;
125
126        writer.del(&self.t_kind, IndexKey::encode_kind(kind, time), Some(uid))?;
127
128        writer.del(
129            &self.t_pubkey,
130            IndexKey::encode_pubkey(pubkey, time),
131            Some(uid),
132        )?;
133        writer.del(
134            &self.t_pubkey_kind,
135            IndexKey::encode_pubkey_kind(pubkey, kind, time),
136            Some(uid),
137        )?;
138
139        if let Some(delegator) = index_event.delegator() {
140            writer.del(
141                &self.t_pubkey,
142                IndexKey::encode_pubkey(delegator, time),
143                Some(uid),
144            )?;
145            writer.del(
146                &self.t_pubkey_kind,
147                IndexKey::encode_pubkey_kind(delegator, kind, time),
148                Some(uid),
149            )?;
150        }
151
152        writer.del(&self.t_created_at, IndexKey::encode_time(time), Some(uid))?;
153
154        let tagval = concat(uid, kind.to_be_bytes());
155        for tag in index_event.tags() {
156            writer.del(
157                &self.t_tag,
158                IndexKey::encode_tag(&tag.0, &tag.1, time),
159                Some(&tagval),
160            )?;
161        }
162
163        // replacement index
164        if let Some(k) = encode_replace_key(index_event.kind(), index_event.pubkey(), event.tags())
165        {
166            writer.del(&self.t_replacement, k, None)?;
167        }
168
169        // expiration
170        if let Some(t) = index_event.expiration() {
171            writer.del(&self.t_expiration, IndexKey::encode_time(*t), Some(uid))?;
172        }
173
174        Ok(())
175    }
176
177    fn put_event(
178        &self,
179        writer: &mut Writer,
180        event: &Event,
181        uid: &Vec<u8>,
182        replace_key: &Option<Vec<u8>>,
183    ) -> Result<(), Error> {
184        let index_event = event.index();
185
186        // put event
187        let time = index_event.created_at();
188        let json = encode_event(event)?;
189
190        writer.put(&self.t_data, uid, json)?;
191
192        // put index
193        let bytes = index_event.to_bytes()?;
194        writer.put(&self.t_index, uid, bytes)?;
195
196        // put view
197        let kind = index_event.kind();
198        let pubkey = index_event.pubkey();
199
200        writer.put(&self.t_id_uid, index_event.id(), uid)?;
201
202        writer.put(&self.t_id, IndexKey::encode_id(index_event.id(), time), uid)?;
203
204        writer.put(&self.t_kind, IndexKey::encode_kind(kind, time), uid)?;
205
206        writer.put(&self.t_pubkey, IndexKey::encode_pubkey(pubkey, time), uid)?;
207        writer.put(
208            &self.t_pubkey_kind,
209            IndexKey::encode_pubkey_kind(pubkey, kind, time),
210            uid,
211        )?;
212
213        if let Some(delegator) = index_event.delegator() {
214            writer.put(
215                &self.t_pubkey,
216                IndexKey::encode_pubkey(delegator, time),
217                uid,
218            )?;
219            writer.put(
220                &self.t_pubkey_kind,
221                IndexKey::encode_pubkey_kind(delegator, kind, time),
222                uid,
223            )?;
224        }
225
226        writer.put(&self.t_created_at, IndexKey::encode_time(time), uid)?;
227
228        let tagval = concat(uid, kind.to_be_bytes());
229        for tag in index_event.tags() {
230            let key = &tag.0;
231            let v = &tag.1;
232            // tag[0] == 'e'
233            if kind == 5 && key[0] == 101 {
234                writer.put(&self.t_deletion, concat(index_event.id(), v), uid)?;
235            }
236            // Provide pubkey kind for filter
237            writer.put(&self.t_tag, IndexKey::encode_tag(key, v, time), &tagval)?;
238        }
239
240        // replacement index
241        if let Some(k) = replace_key {
242            // writer.put(&self.t_replacement, k, concat(time.to_be_bytes(), uid))?;
243            writer.put(&self.t_replacement, k, uid)?;
244        }
245
246        // expiration
247        if let Some(t) = index_event.expiration() {
248            writer.put(&self.t_expiration, IndexKey::encode_time(*t), uid)?;
249        }
250
251        // word
252        let words = &event.words;
253        if !words.is_empty() {
254            let bytes =
255                rkyv::to_bytes::<_, 256>(words).map_err(|e| Error::Serialization(e.to_string()))?;
256            writer.put(&self.t_uid_word, uid, bytes)?;
257            for item in words {
258                writer.put(&self.t_word, IndexKey::encode_word(item, time), uid)?;
259            }
260        }
261        Ok(())
262    }
263}
264
265fn get_event<R: FromEventData, K: AsRef<[u8]>, T: Transaction>(
266    reader: &T,
267    id_tree: &Tree,
268    data_tree: &Tree,
269    index_tree: &Tree,
270    event_id: K,
271) -> Result<Option<(Vec<u8>, R)>, Error> {
272    let uid = get_uid(reader, id_tree, event_id)?;
273    if let Some(uid) = uid {
274        let event = get_event_by_uid(reader, data_tree, index_tree, &uid)?;
275        if let Some(event) = event {
276            return Ok(Some((uid, event)));
277        }
278    }
279    Ok(None)
280}
281
282fn get_event_by_uid<R: FromEventData, K: AsRef<[u8]>, T: Transaction>(
283    reader: &T,
284    data_tree: &Tree,
285    index_tree: &Tree,
286    uid: K,
287) -> Result<Option<R>, Error> {
288    if R::only_id() {
289        // get event id from index more faster
290        let v = reader.get(index_tree, uid)?;
291        let event = decode_event_index(v)?;
292        if let Some(v) = event {
293            return Ok(Some(
294                R::from_data(v.id()).map_err(|e| Error::Message(e.to_string()))?,
295            ));
296        }
297    } else {
298        let v = reader.get(data_tree, uid)?;
299        if let Some(v) = v {
300            return Ok(Some(
301                R::from_data(v).map_err(|e| Error::Message(e.to_string()))?,
302            ));
303        }
304    }
305    Ok(None)
306}
307
308fn decode_event_index(v: Option<&[u8]>) -> Result<Option<&ArchivedEventIndex>, Error> {
309    if let Some(v) = v {
310        return Ok(Some(EventIndex::from_zeroes(v)?));
311    }
312    Ok(None)
313}
314
315fn get_uid<K: AsRef<[u8]>, T: Transaction>(
316    reader: &T,
317    id_tree: &Tree,
318    event_id: K,
319) -> Result<Option<Vec<u8>>, Error> {
320    Ok(reader.get(id_tree, event_id)?.map(|v| v.to_vec()))
321}
322
323#[derive(Debug, Clone)]
324pub enum CheckEventResult {
325    Invald(String),
326    Duplicate,
327    Deleted,
328    ReplaceIgnored,
329    Ok(usize),
330}
331
332impl Db {
333    pub fn flush(&self) -> Result<()> {
334        self.inner.flush()?;
335        Ok(())
336    }
337
338    /// check db version, return [`Error::VersionMismatch`] when db schema changed
339    pub fn check_schema(&self) -> Result<()> {
340        let mut writer = self.inner.writer()?;
341        let old = writer.get(&self.t_meta, "version")?;
342        if let Some(old) = old {
343            if old != DB_VERSION.as_bytes() {
344                return Err(Error::VersionMismatch);
345            }
346        } else {
347            writer.put(&self.t_meta, "version", DB_VERSION)?;
348        }
349        writer.commit()?;
350        Ok(())
351    }
352
353    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
354        let inner = Lmdb::open_with(path, Some(20), Some(100), Some(1_000_000_000_000), 0)?;
355
356        let default_opts = 0;
357        // let integer_default_opts = ffi::MDB_INTEGERKEY;
358        let integer_default_opts = 0;
359
360        // let index_opts = ffi::MDB_DUPSORT | ffi::MDB_DUPFIXED | ffi::MDB_INTEGERDUP;
361        let index_opts = ffi::MDB_DUPSORT | ffi::MDB_DUPFIXED;
362
363        // let integer_index_opts =
364        // ffi::MDB_DUPSORT | ffi::MDB_INTEGERKEY | ffi::MDB_DUPFIXED | ffi::MDB_INTEGERDUP;
365        // lmdb interger needs check byte order. little-endian
366        let integer_index_opts = ffi::MDB_DUPSORT | ffi::MDB_DUPFIXED;
367
368        let t_data = inner.open_tree(Some("t_data"), integer_default_opts)?;
369        let t_meta = inner.open_tree(Some("t_meta"), default_opts)?;
370
371        Ok(Self {
372            seq: Arc::new(AtomicU64::new(latest_seq(&inner, &t_data)?)),
373            t_data,
374            t_meta,
375            t_index: inner.open_tree(Some("t_index"), integer_default_opts)?,
376            t_id_uid: inner.open_tree(Some("t_id_uid"), default_opts)?,
377            t_uid_word: inner.open_tree(Some("t_uid_word"), default_opts)?,
378            t_deletion: inner.open_tree(Some("t_deletion"), default_opts)?,
379            t_replacement: inner.open_tree(Some("t_replacement"), default_opts)?,
380            t_id: inner.open_tree(Some("t_id"), default_opts)?,
381            t_pubkey: inner.open_tree(Some("t_pubkey"), index_opts)?,
382            t_kind: inner.open_tree(Some("t_kind"), index_opts)?,
383            t_pubkey_kind: inner.open_tree(Some("t_pubkey_kind"), index_opts)?,
384            t_created_at: inner.open_tree(Some("t_created_at"), integer_index_opts)?,
385            t_tag: inner.open_tree(Some("t_tag"), ffi::MDB_DUPSORT | ffi::MDB_DUPFIXED)?,
386            t_expiration: inner.open_tree(Some("t_expiration"), integer_index_opts)?,
387            t_word: inner.open_tree(Some("t_word"), index_opts)?,
388
389            inner,
390        })
391    }
392
393    pub fn writer(&self) -> Result<Writer> {
394        Ok(self.inner.writer()?)
395    }
396
397    pub fn reader(&self) -> Result<Reader> {
398        Ok(self.inner.reader()?)
399    }
400
401    pub fn commit<T: Transaction>(&self, txn: T) -> Result<()> {
402        Ok(txn.commit()?)
403    }
404
405    pub fn put<E: AsRef<Event>>(&self, writer: &mut Writer, event: E) -> Result<CheckEventResult> {
406        let event = event.as_ref();
407        let mut count = 0;
408
409        if event.id().len() != 32 || event.pubkey().len() != 32 {
410            return Ok(CheckEventResult::Invald(
411                "invalid event id or pubkey".to_owned(),
412            ));
413        }
414        // let id: Vec<u8> = pad_start(event.id(), 32);
415        let event_id = event.id();
416        let pubkey = event.pubkey();
417
418        // Check duplicate event.
419        {
420            // dup in the db.
421            if get_uid(writer, &self.t_id_uid, event_id)?.is_some() {
422                return Ok(CheckEventResult::Duplicate);
423            }
424        }
425
426        // check deleted in db
427        if writer
428            .get(&self.t_deletion, concat(event_id, pubkey))?
429            .is_some()
430        {
431            return Ok(CheckEventResult::Deleted);
432        }
433
434        // [NIP-09](https://nips.be/9)
435        // delete event
436        if event.kind() == 5 {
437            for tag in event.index().tags() {
438                if tag.0 == b"e" {
439                    // let key = hex::decode(&tag.1).map_err(|e| Error::Hex(e))?;
440                    let key = &tag.1;
441                    let r = get_event::<Event, _, _>(
442                        writer,
443                        &self.t_id_uid,
444                        &self.t_data,
445                        &self.t_index,
446                        key,
447                    )?;
448                    if let Some((uid, e)) = r {
449                        // check author or deletion event
450                        // check delegator
451                        if (e.pubkey() == event.pubkey()
452                            || e.index().delegator() == Some(event.pubkey()))
453                            && e.kind() != 5
454                        {
455                            count += 1;
456                            self.del_event(writer, &e, &uid)?;
457                        }
458                    }
459                }
460            }
461        }
462
463        // check replacement event
464        let replace_key = encode_replace_key(event.kind(), event.pubkey(), event.tags());
465
466        if let Some(replace_key) = replace_key.as_ref() {
467            // lmdb max_key_size 511 bytes
468            // we only index tag value length < 255
469            if replace_key.len() > MAX_TAG_VALUE_SIZE + 8 + 32 {
470                return Ok(CheckEventResult::Invald("invalid replace key".to_owned()));
471            }
472
473            // replace in the db
474            let v = writer.get(&self.t_replacement, replace_key)?;
475            if let Some(v) = v {
476                let uid = v.to_vec();
477                // let t = &v[0..8];
478                // let t = u64_from_bytes(t);
479                // if event.created_at() < t {
480                //     continue;
481                // }
482                let e: Option<Event> = get_event_by_uid(writer, &self.t_data, &self.t_index, &uid)?;
483                if let Some(e) = e {
484                    // If two events have the same timestamp, the event with the lowest id (first in lexical order) SHOULD be retained, and the other discarded.
485                    if event.created_at() < e.created_at()
486                        || (event.created_at() == e.created_at() && event.id() > e.id())
487                    {
488                        return Ok(CheckEventResult::ReplaceIgnored);
489                    }
490                    // del old
491                    count += 1;
492                    self.del_event(writer, &e, &uid)?;
493                }
494            }
495        }
496
497        count += 1;
498
499        let seq = self.seq.fetch_add(1, Ordering::SeqCst);
500        let seq = u64_to_ver(seq);
501        self.put_event(writer, event, &seq, &replace_key)?;
502        Ok(CheckEventResult::Ok(count))
503    }
504
505    pub fn get<R: FromEventData, K: AsRef<[u8]>, T: Transaction>(
506        &self,
507        txn: &T,
508        event_id: K,
509    ) -> Result<Option<R>> {
510        let event = get_event(txn, &self.t_id_uid, &self.t_data, &self.t_index, event_id)?;
511        Ok(event.map(|e| e.1))
512    }
513
514    pub fn del<K: AsRef<[u8]>>(&self, writer: &mut Writer, event_id: K) -> Result<bool> {
515        if let Some((uid, event)) = get_event::<Event, _, _>(
516            writer,
517            &self.t_id_uid,
518            &self.t_data,
519            &self.t_index,
520            event_id,
521        )? {
522            self.del_event(writer, &event, &uid)?;
523            Ok(true)
524        } else {
525            Ok(false)
526        }
527    }
528
529    pub fn batch_put<II, N>(&self, events: II) -> Result<usize>
530    where
531        II: IntoIterator<Item = N>,
532        N: AsRef<Event>,
533    {
534        let mut writer = self.inner.writer()?;
535        let mut events = events.into_iter().collect::<Vec<N>>();
536
537        // sort for check dup
538        events.sort_by(|a, b| a.as_ref().id().cmp(b.as_ref().id()));
539        let mut count = 0;
540
541        for (i, event) in events.iter().enumerate() {
542            let event = event.as_ref();
543            // dup in the input events
544            if i != 0 && event.id() == events[i - 1].as_ref().id() {
545                continue;
546            }
547            if let CheckEventResult::Ok(c) = self.put(&mut writer, event)? {
548                count += c;
549            }
550        }
551
552        writer.commit()?;
553        Ok(count)
554    }
555
556    pub fn batch_get<R: FromEventData, II, N>(&self, event_ids: II) -> Result<Vec<R>>
557    where
558        II: IntoIterator<Item = N>,
559        N: AsRef<[u8]>,
560    {
561        let reader = self.reader()?;
562        let mut events = vec![];
563        for id in event_ids.into_iter() {
564            let r = self.get::<R, _, _>(&reader, &id)?;
565            if let Some(e) = r {
566                events.push(e);
567            }
568        }
569        Ok(events)
570    }
571
572    pub fn batch_del<II, N>(&self, event_ids: II) -> Result<()>
573    where
574        II: IntoIterator<Item = N>,
575        N: AsRef<[u8]>,
576    {
577        let mut writer = self.inner.writer()?;
578        for id in event_ids.into_iter() {
579            self.del(&mut writer, &id)?;
580        }
581        writer.commit()?;
582        Ok(())
583    }
584
585    /// iter events by filter
586    pub fn iter<'txn, J: FromEventData, T: Transaction>(
587        &self,
588        txn: &'txn T,
589        filter: &Filter,
590    ) -> Result<Iter<'txn, T, J>> {
591        if filter.search.as_ref().is_some() {
592            let match_index = if !filter.ids.is_empty()
593                || !filter.tags.is_empty()
594                || !filter.authors.is_empty()
595                || !filter.kinds.is_empty()
596            {
597                MatchIndex::All
598            } else {
599                MatchIndex::None
600            };
601            Iter::new_word(self, txn, filter, &self.t_word, match_index)
602        } else if !filter.ids.is_empty() {
603            let match_index = if !filter.tags.is_empty()
604                || !filter.authors.is_empty()
605                || !filter.kinds.is_empty()
606            {
607                MatchIndex::All
608            } else {
609                MatchIndex::None
610            };
611            Iter::new_prefix(self, txn, filter, &filter.ids, &self.t_id, match_index)
612        } else if !filter.tags.is_empty() {
613            let match_index = if !filter.authors.is_empty() {
614                MatchIndex::Pubkey
615            } else {
616                MatchIndex::None
617            };
618            Iter::new_tag(self, txn, filter, &self.t_tag, match_index)
619        } else if !filter.authors.is_empty() && !filter.kinds.is_empty() {
620            Iter::new_author_kind(self, txn, filter, &self.t_pubkey_kind, MatchIndex::None)
621        } else if !filter.authors.is_empty() {
622            Iter::new_prefix(
623                self,
624                txn,
625                filter,
626                &filter.authors,
627                &self.t_pubkey,
628                MatchIndex::None,
629            )
630        } else if !filter.kinds.is_empty() {
631            Iter::new_kind(self, txn, filter, &self.t_kind, MatchIndex::None)
632        } else {
633            Iter::new_time(self, txn, filter, &self.t_created_at, MatchIndex::None)
634        }
635    }
636
637    /// iter expired events
638    pub fn iter_expiration<'txn, J: FromEventData, T: Transaction>(
639        &self,
640        txn: &'txn T,
641        until: Option<u64>,
642    ) -> Result<Iter<'txn, T, J>> {
643        let filter = Filter {
644            desc: true,
645            until,
646            ..Default::default()
647        };
648        Iter::new_time(self, txn, &filter, &self.t_expiration, MatchIndex::None)
649    }
650
651    /// iter ephemeral events
652    pub fn iter_ephemeral<'txn, J: FromEventData, T: Transaction>(
653        &self,
654        txn: &'txn T,
655        until: Option<u64>,
656    ) -> Result<Iter<'txn, T, J>> {
657        let filter = Filter {
658            desc: false,
659            until,
660            ..Default::default()
661        };
662        let mut group = Group::new(filter.desc, false, false);
663        let prefix = u16_to_ver(20000);
664        let end = u16_to_ver(30000);
665
666        let iter = create_iter(txn, &self.t_kind, &prefix, filter.desc);
667        let scanner = Scanner::new(
668            iter,
669            vec![],
670            prefix,
671            filter.desc,
672            filter.since,
673            filter.until,
674            Box::new(move |_s, r| {
675                let k = r.0;
676                let e: &[u8] = end.as_ref();
677                Ok(if k < e {
678                    MatchResult::Found(IndexKey::from(k, r.1)?)
679                } else {
680                    MatchResult::Stop
681                })
682            }),
683        );
684        group.add(Box::new(scanner))?;
685        Iter::new(self, txn, &filter, group, MatchIndex::None)
686    }
687}
688
689// type IterChecker<I, E> =
690//     Box<dyn Fn(&Scanner<I, IndexKey>, &IndexKey) -> Result<CheckResult, Error<E>>>;
691// #[allow(unused)]
692// enum CheckResult {
693//     Continue,
694//     Found,
695// }
696
697#[derive(Debug)]
698enum MatchIndex {
699    All,
700    Pubkey,
701    None,
702}
703
704impl MatchIndex {
705    fn r#match(&self, filter: &Filter, event: &ArchivedEventIndex) -> bool {
706        match &self {
707            MatchIndex::Pubkey => {
708                Filter::match_author(&filter.authors, event.pubkey(), event.delegator())
709            }
710            _ => filter.match_archived(event),
711        }
712    }
713}
714
715pub struct Iter<'txn, R, J>
716where
717    R: Transaction,
718{
719    reader: &'txn R,
720    view_data: Tree,
721    view_index: Tree,
722    group: Group<'txn, IndexKey, Error>,
723    get_data: u64,
724    get_index: u64,
725    filter: Filter,
726    // checker: Option<IterChecker<D::Iter, D::Error>>,
727    _r: PhantomData<J>,
728    // need get index data for filter
729    match_index: MatchIndex,
730}
731
732fn create_iter<'a, R: Transaction>(
733    reader: &'a R,
734    tree: &Tree,
735    prefix: &Vec<u8>,
736    reverse: bool,
737) -> LmdbIter<'a> {
738    if reverse {
739        let start = upper(prefix.clone())
740            .map(Bound::Excluded)
741            .unwrap_or(Bound::Unbounded);
742        reader.iter_from(tree, start, true)
743    } else {
744        reader.iter_from(tree, Bound::Included(prefix), false)
745    }
746}
747
748impl<'txn, R, J> Iter<'txn, R, J>
749where
750    R: Transaction,
751    J: FromEventData,
752{
753    fn new(
754        kv_db: &Db,
755        reader: &'txn R,
756        filter: &Filter,
757        group: Group<'txn, IndexKey, Error>,
758        match_index: MatchIndex,
759    ) -> Result<Self, Error> {
760        Ok(Self {
761            view_data: kv_db.t_data.clone(),
762            view_index: kv_db.t_index.clone(),
763            reader,
764            group,
765            get_data: 0,
766            get_index: 0,
767            filter: filter.clone(),
768            // checker: None,
769            _r: PhantomData,
770            match_index,
771        })
772    }
773
774    /// Filter from timestamp index
775    fn new_time(
776        kv_db: &Db,
777        reader: &'txn R,
778        filter: &Filter,
779        view: &Tree,
780        match_index: MatchIndex,
781    ) -> Result<Self, Error> {
782        let mut group = Group::new(filter.desc, false, false);
783        let prefix = if filter.desc {
784            (u64::MAX - 1).to_be_bytes()
785        } else {
786            0u64.to_be_bytes()
787        }
788        .to_vec();
789        let iter = create_iter(reader, view, &prefix, filter.desc);
790        let scanner = Scanner::new(
791            iter,
792            vec![],
793            prefix,
794            filter.desc,
795            filter.since,
796            filter.until,
797            Box::new(|_, r| Ok(MatchResult::Found(IndexKey::from(r.0, r.1)?))),
798        );
799        group.add(Box::new(scanner))?;
800        Self::new(kv_db, reader, filter, group, match_index)
801    }
802
803    fn new_kind(
804        kv_db: &Db,
805        reader: &'txn R,
806        filter: &Filter,
807        view: &Tree,
808        match_index: MatchIndex,
809    ) -> Result<Self, Error> {
810        let mut group = Group::new(filter.desc, false, false);
811        for kind in filter.kinds.iter() {
812            let prefix = u16_to_ver(*kind);
813            let iter = create_iter(reader, view, &prefix, filter.desc);
814            let scanner = Scanner::new(
815                iter,
816                vec![],
817                prefix,
818                filter.desc,
819                filter.since,
820                filter.until,
821                Box::new(|s, r| {
822                    let k = r.0;
823                    Ok(if k.starts_with(&s.prefix) {
824                        MatchResult::Found(IndexKey::from(k, r.1)?)
825                    } else {
826                        MatchResult::Stop
827                    })
828                }),
829            );
830            group.add(Box::new(scanner))?;
831        }
832        Self::new(kv_db, reader, filter, group, match_index)
833    }
834
835    fn new_tag(
836        kv_db: &Db,
837        reader: &'txn R,
838        filter: &Filter,
839        view: &Tree,
840        match_index: MatchIndex,
841    ) -> Result<Self, Error> {
842        let mut group = Group::new(filter.desc, true, false);
843        let has_kind = !filter.kinds.is_empty();
844
845        for tag in filter.tags.iter() {
846            let mut sub = Group::new(filter.desc, false, true);
847            for key in tag.1.iter() {
848                let kinds = filter.kinds.clone();
849                // need add separator to the end, otherwise other tags will intrude
850                // ["t", "nostr"]
851                // ["t", "nostr1"]
852                let prefix = concat_sep(concat_sep(tag.0, key), vec![]);
853                let klen = prefix.len() + 8;
854                let iter = create_iter(reader, view, &prefix, filter.desc);
855
856                let scanner = Scanner::new(
857                    iter,
858                    vec![],
859                    prefix,
860                    filter.desc,
861                    filter.since,
862                    filter.until,
863                    Box::new(move |s, r| {
864                        let k = r.0;
865                        let v = r.1;
866                        Ok(if k.len() == klen && k.starts_with(&s.prefix) {
867                            // filter
868                            if has_kind && !Filter::match_kind(&kinds, u16_from_bytes(&v[8..10])?) {
869                                MatchResult::Continue
870                            } else {
871                                MatchResult::Found(IndexKey::from(k, v)?)
872                            }
873                        } else {
874                            MatchResult::Stop
875                        })
876                    }),
877                );
878                sub.add(Box::new(scanner))?;
879            }
880            group.add(Box::new(sub))?;
881        }
882        Self::new(kv_db, reader, filter, group, match_index)
883    }
884
885    fn new_author_kind(
886        kv_db: &Db,
887        reader: &'txn R,
888        filter: &Filter,
889        view: &Tree,
890        match_index: MatchIndex,
891    ) -> Result<Self, Error> {
892        let mut group = Group::new(filter.desc, false, false);
893
894        for author in filter.authors.iter() {
895            for kind in filter.kinds.iter() {
896                let prefix: Vec<u8> = concat(author, u16_to_ver(*kind));
897                let iter = create_iter(reader, view, &prefix, filter.desc);
898                let scanner = Scanner::new(
899                    iter,
900                    author.to_vec(),
901                    prefix,
902                    filter.desc,
903                    filter.since,
904                    filter.until,
905                    Box::new(|s, r| {
906                        Ok(if r.0.starts_with(&s.prefix) {
907                            MatchResult::Found(IndexKey::from(r.0, r.1)?)
908                        } else {
909                            MatchResult::Stop
910                        })
911                    }),
912                );
913                group.add(Box::new(scanner))?;
914            }
915        }
916
917        Self::new(kv_db, reader, filter, group, match_index)
918    }
919
920    fn new_prefix(
921        kv_db: &Db,
922        reader: &'txn R,
923        filter: &Filter,
924        ids: &[[u8; 32]],
925        view: &Tree,
926        match_index: MatchIndex,
927    ) -> Result<Self, Error> {
928        let mut group = Group::new(filter.desc, false, false);
929
930        for id in ids.iter() {
931            let prefix = id.to_vec();
932            let iter = create_iter(reader, view, &prefix, filter.desc);
933            let scanner = Scanner::new(
934                iter,
935                prefix.clone(),
936                prefix,
937                filter.desc,
938                filter.since,
939                filter.until,
940                Box::new(move |s, r| {
941                    Ok(if r.0.starts_with(&s.prefix) {
942                        MatchResult::Found(IndexKey::from(r.0, r.1)?)
943                    } else {
944                        MatchResult::Stop
945                    })
946                }),
947            );
948            group.add(Box::new(scanner))?;
949        }
950        Self::new(kv_db, reader, filter, group, match_index)
951    }
952
953    fn new_word(
954        kv_db: &Db,
955        reader: &'txn R,
956        filter: &Filter,
957        view: &Tree,
958        match_index: MatchIndex,
959    ) -> Result<Self, Error> {
960        let mut group = Group::new(filter.desc, true, true);
961        for word in filter.words.iter() {
962            let prefix = concat_sep(word, []);
963            let klen = prefix.len() + 8;
964            let iter = create_iter(reader, view, &prefix, filter.desc);
965            let scanner = Scanner::new(
966                iter,
967                vec![],
968                prefix,
969                filter.desc,
970                filter.since,
971                filter.until,
972                Box::new(move |s, r| {
973                    let k = r.0;
974                    Ok(if k.len() == klen && k.starts_with(&s.prefix) {
975                        MatchResult::Found(IndexKey::from(k, r.1)?)
976                    } else {
977                        MatchResult::Stop
978                    })
979                }),
980            );
981            group.add(Box::new(scanner))?;
982        }
983        Self::new(kv_db, reader, filter, group, match_index)
984    }
985
986    fn document(&self, key: &IndexKey) -> Result<Option<J>, Error> {
987        get_event_by_uid::<J, _, _>(
988            self.reader,
989            &self.view_data,
990            &self.view_index,
991            key.uid().to_be_bytes(),
992        )
993    }
994
995    fn index_data(&self, key: &IndexKey) -> Result<Option<&'txn [u8]>, Error> {
996        let v = self.reader.get(&self.view_index, key.uid().to_be_bytes())?;
997        Ok(v)
998    }
999
1000    fn limit(&self, num: u64) -> bool {
1001        if let Some(limit) = self.filter.limit {
1002            num >= limit
1003        } else {
1004            false
1005        }
1006    }
1007
1008    fn next_inner(&mut self) -> Result<Option<J>, Error> {
1009        while let Some(item) = self.group.next() {
1010            let key = item?;
1011            if matches!(self.match_index, MatchIndex::None) {
1012                self.get_data += 1;
1013                if let Some(event) = self.document(&key)? {
1014                    return Ok(Some(event));
1015                }
1016            } else {
1017                let data = self.index_data(&key)?;
1018                let event = decode_event_index(data)?;
1019                self.get_index += 1;
1020                if let Some(event) = event {
1021                    if self.match_index.r#match(&self.filter, event) {
1022                        self.get_data += 1;
1023                        if let Some(event) = self.document(&key)? {
1024                            return Ok(Some(event));
1025                        }
1026                    }
1027                }
1028            }
1029        }
1030        Ok(None)
1031    }
1032}
1033
1034impl<'txn, R, J> Iter<'txn, R, J>
1035where
1036    R: Transaction,
1037    J: FromEventData,
1038{
1039    /// Limit the total scan time and report [`Error::ScanTimeout`] if it is exceeded
1040    pub fn scan_time(&mut self, timeout: Duration, check_step: u64) {
1041        let start = Instant::now();
1042        let mut last = check_step;
1043        self.group.watcher(Box::new(move |count| {
1044            if count > last {
1045                // check
1046                if start.elapsed() > timeout {
1047                    return Err(Error::ScanTimeout);
1048                }
1049                last = count + check_step;
1050            }
1051            Ok(())
1052        }));
1053    }
1054
1055    /// The stats after scan
1056    pub fn stats(&self) -> Stats {
1057        Stats {
1058            scan_index: self.group.scan_times,
1059            get_data: self.get_data,
1060            get_index: self.get_index,
1061        }
1062    }
1063
1064    /// only count iter size
1065    pub fn size(mut self) -> Result<(u64, Stats)> {
1066        let mut len = 0;
1067        while let Some(item) = self.group.next() {
1068            let key = item?;
1069            if matches!(self.match_index, MatchIndex::None) {
1070                len += 1;
1071                if self.limit(len) {
1072                    break;
1073                }
1074            } else {
1075                let data = self.index_data(&key)?;
1076                let event = decode_event_index(data)?;
1077                self.get_index += 1;
1078                if let Some(event) = event {
1079                    if self.match_index.r#match(&self.filter, event) {
1080                        len += 1;
1081                        if self.limit(len) {
1082                            break;
1083                        }
1084                    }
1085                }
1086            }
1087        }
1088        Ok((
1089            len,
1090            Stats {
1091                get_data: 0,
1092                get_index: self.get_index,
1093                scan_index: self.group.scan_times,
1094            },
1095        ))
1096    }
1097}
1098
1099impl<'txn, R, J> Iterator for Iter<'txn, R, J>
1100where
1101    R: Transaction,
1102    J: FromEventData,
1103{
1104    type Item = Result<J, Error>;
1105    fn next(&mut self) -> Option<Self::Item> {
1106        if self.limit(self.get_data) {
1107            None
1108        } else {
1109            self.next_inner().transpose()
1110        }
1111    }
1112}
1113
1114#[cfg(test)]
1115mod tests {
1116    use super::upper;
1117
1118    #[test]
1119    pub fn test_upper_fn() {
1120        assert_eq!(upper(vec![1, 2, 3, 4, 5]), Some(vec![1, 2, 3, 4, 6]));
1121        assert_eq!(upper(vec![1, 2, 3, 4, 255]), Some(vec![1, 2, 3, 5]));
1122        assert_eq!(upper(vec![1, 2, 3, 255, 255]), Some(vec![1, 2, 4]));
1123        assert_eq!(upper(vec![1, 2, 255, 255, 255]), Some(vec![1, 3]));
1124        assert_eq!(upper(vec![1, 255, 255, 255, 255]), Some(vec![2]));
1125        assert_eq!(upper(vec![255, 255, 255, 255, 255]), None);
1126        assert_eq!(upper(vec![1, 2, 3, 255, 5]), Some(vec![1, 2, 3, 255, 6]));
1127        assert_eq!(upper(vec![255, 2, 3, 4, 5]), Some(vec![255, 2, 3, 4, 6]));
1128    }
1129}