Skip to main content

nostrdb_social/
ndb.rs

1use std::ffi::CString;
2use std::ptr;
3
4use crate::bindings::ndb_search;
5use crate::{
6    bindings, Blocks, Config, Error, Filter, IngestMetadata, Note, NoteKey, NoteMetadata,
7    ProfileKey, ProfileRecord, QueryResult, Result, Subscription, SubscriptionState,
8    SubscriptionStream, Transaction,
9};
10use futures::StreamExt;
11use std::collections::hash_map::Entry;
12use std::collections::HashMap;
13use std::fs;
14use std::os::raw::c_int;
15use std::path::Path;
16use std::sync::{Arc, Mutex};
17use tracing::debug;
18
19#[derive(Debug)]
20struct NdbRef {
21    ndb: *mut bindings::ndb,
22    rust_cb_ctx: *mut ::std::os::raw::c_void,
23}
24
25/// SAFETY: thread safety is ensured by nostrdb
26unsafe impl Send for NdbRef {}
27
28/// SAFETY: thread safety is ensured by nostrdb
29unsafe impl Sync for NdbRef {}
30
31/// The database is automatically closed when [Ndb] is [Drop]ped.
32impl Drop for NdbRef {
33    fn drop(&mut self) {
34        unsafe {
35            bindings::ndb_destroy(self.ndb);
36
37            if !self.rust_cb_ctx.is_null() {
38                // Rebuild the Box from the raw pointer and drop it.
39                let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>);
40            }
41        }
42    }
43}
44
45type SubMap = HashMap<Subscription, SubscriptionState>;
46
47/// A nostrdb context. Construct one of these with [Ndb::new].
48#[derive(Debug, Clone)]
49pub struct Ndb {
50    refs: Arc<NdbRef>,
51
52    /// Track query future states
53    pub(crate) subs: Arc<Mutex<SubMap>>,
54}
55
56impl Ndb {
57    /// Construct a new nostrdb context. Takes a directory where the database
58    /// is/will be located and a nostrdb config.
59    pub fn new(db_dir: &str, config: &Config) -> Result<Self> {
60        let db_dir_cstr = match CString::new(db_dir) {
61            Ok(cstr) => cstr,
62            Err(_) => return Err(Error::DbOpenFailed),
63        };
64        let mut ndb: *mut bindings::ndb = ptr::null_mut();
65
66        let path = Path::new(db_dir);
67        if !path.exists() {
68            let _ = fs::create_dir_all(path);
69        }
70
71        let min_mapsize = 1024 * 1024 * 512;
72        let mut mapsize = config.config.mapsize;
73        let config = *config;
74
75        let prev_callback = config.config.sub_cb;
76        let prev_callback_ctx = config.config.sub_cb_ctx;
77        let subs = Arc::new(Mutex::new(SubMap::default()));
78        let subs_clone = subs.clone();
79
80        // We need to register our own callback so that we can wake
81        // query futures
82        let mut config = config.set_sub_callback(move |sub_id: u64| {
83            let mut map = subs_clone.lock().unwrap();
84            if let Some(s) = map.get_mut(&Subscription::new(sub_id)) {
85                if let Some(w) = s.waker.take() {
86                    w.wake();
87                }
88            }
89
90            if let Some(pcb) = prev_callback {
91                unsafe {
92                    pcb(prev_callback_ctx, sub_id);
93                };
94            }
95        });
96
97        let result = loop {
98            let result =
99                unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) };
100
101            if result == 0 {
102                mapsize /= 2;
103                config = config.set_mapsize(mapsize);
104                debug!("ndb init failed, reducing mapsize to {}", mapsize);
105
106                if mapsize > min_mapsize {
107                    continue;
108                } else {
109                    break 0;
110                }
111            } else {
112                break result;
113            }
114        };
115
116        if result == 0 {
117            return Err(Error::DbOpenFailed);
118        }
119
120        let rust_cb_ctx = config.config.sub_cb_ctx;
121        let refs = Arc::new(NdbRef { ndb, rust_cb_ctx });
122
123        Ok(Ndb { refs, subs })
124    }
125
126    /// Ingest a relay or client sent event, with optional relay metadata.
127    /// This function returns immediately and doesn't provide any information on
128    /// if ingestion was successful or not.
129    pub fn process_event_with(&self, json: &str, mut meta: IngestMetadata) -> Result<()> {
130        // Convert the Rust string to a C-style string
131        let c_json = CString::new(json)?;
132        let c_json_ptr = c_json.as_ptr();
133
134        // Get the length of the string
135        let len = json.len() as libc::c_int;
136
137        let res = unsafe {
138            bindings::ndb_process_event_with(self.as_ptr(), c_json_ptr, len, meta.as_mut_ptr())
139        };
140
141        if res == 0 {
142            return Err(Error::NoteProcessFailed);
143        }
144
145        Ok(())
146    }
147
148    /// Ingest a relay-sent event in the form `["EVENT","subid", {"id:"...}]`
149    /// This function returns immediately and doesn't provide any information on
150    /// if ingestion was successful or not.
151    pub fn process_event(&self, json: &str) -> Result<()> {
152        self.process_event_with(json, IngestMetadata::new().client(false))
153    }
154
155    /// Ingest a client-sent event in the form `["EVENT", {"id:"...}]`
156    /// This function returns immediately and doesn't provide any information on
157    /// if ingestion was successful or not.
158    pub fn process_client_event(&self, json: &str) -> Result<()> {
159        self.process_event_with(json, IngestMetadata::new().client(true))
160    }
161
162    pub fn query<'a>(
163        &self,
164        txn: &'a Transaction,
165        filters: &[Filter],
166        max_results: i32,
167    ) -> Result<Vec<QueryResult<'a>>> {
168        let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect();
169        let mut out: Vec<bindings::ndb_query_result> = vec![];
170        let mut returned: i32 = 0;
171        out.reserve_exact(max_results as usize);
172        let res = unsafe {
173            bindings::ndb_query(
174                txn.as_mut_ptr(),
175                ndb_filters.as_mut_ptr(),
176                ndb_filters.len() as i32,
177                out.as_mut_ptr(),
178                max_results,
179                &mut returned as *mut i32,
180            )
181        };
182        if res == 1 {
183            unsafe {
184                out.set_len(returned as usize);
185            };
186            Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect())
187        } else {
188            Err(Error::QueryError)
189        }
190    }
191
192    pub fn subscription_count(&self) -> u32 {
193        unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 }
194    }
195
196    pub fn unsubscribe(&mut self, sub: Subscription) -> Result<()> {
197        let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) };
198
199        debug!(
200            "unsubscribed from {}, sub count {}",
201            sub.id(),
202            self.subscription_count()
203        );
204
205        // mark the subscription as done if it exists in our stream map
206        {
207            let mut map = self.subs.lock().unwrap();
208            if let Entry::Occupied(mut entry) = map.entry(sub) {
209                entry.get_mut().done = true;
210            }
211        }
212
213        if r == 0 {
214            Err(Error::SubscriptionError)
215        } else {
216            Ok(())
217        }
218    }
219
220    pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> {
221        unsafe {
222            let mut ndb_filters: Vec<bindings::ndb_filter> =
223                filters.iter().map(|a| a.data).collect();
224            let id = bindings::ndb_subscribe(
225                self.as_ptr(),
226                ndb_filters.as_mut_ptr(),
227                filters.len() as i32,
228            );
229            if id == 0 {
230                Err(Error::SubscriptionError)
231            } else {
232                Ok(Subscription::new(id))
233            }
234        }
235    }
236
237    pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> {
238        let mut vec = vec![];
239        vec.reserve_exact(max_notes as usize);
240
241        unsafe {
242            let res = bindings::ndb_poll_for_notes(
243                self.as_ptr(),
244                sub.id(),
245                vec.as_mut_ptr(),
246                max_notes as c_int,
247            );
248            vec.set_len(res as usize);
249        };
250
251        vec.into_iter().map(NoteKey::new).collect()
252    }
253
254    pub async fn wait_for_notes(
255        &self,
256        sub_id: Subscription,
257        max_notes: u32,
258    ) -> Result<Vec<NoteKey>> {
259        let mut stream = SubscriptionStream::new(self.clone(), sub_id).notes_per_await(max_notes);
260
261        match stream.next().await {
262            Some(res) => Ok(res),
263            None => Err(Error::SubscriptionError),
264        }
265    }
266
267    pub fn get_profile_by_key<'a>(
268        &self,
269        transaction: &'a Transaction,
270        key: ProfileKey,
271    ) -> Result<ProfileRecord<'a>> {
272        let mut len: usize = 0;
273
274        let profile_record_ptr = unsafe {
275            bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len)
276        };
277
278        if profile_record_ptr.is_null() {
279            // Handle null pointer (e.g., note not found or error occurred)
280            return Err(Error::NotFound);
281        }
282
283        // Convert the raw pointer to a Note instance
284        Ok(ProfileRecord::new_transactional(
285            profile_record_ptr,
286            len,
287            key,
288            transaction,
289        ))
290    }
291
292    pub fn get_profile_by_pubkey<'a>(
293        &self,
294        transaction: &'a Transaction,
295        id: &[u8; 32],
296    ) -> Result<ProfileRecord<'a>> {
297        let mut len: usize = 0;
298        let mut primkey: u64 = 0;
299
300        let profile_record_ptr = unsafe {
301            bindings::ndb_get_profile_by_pubkey(
302                transaction.as_mut_ptr(),
303                id.as_ptr(),
304                &mut len,
305                &mut primkey,
306            )
307        };
308
309        if profile_record_ptr.is_null() {
310            // Handle null pointer (e.g., note not found or error occurred)
311            return Err(Error::NotFound);
312        }
313
314        // Convert the raw pointer to a Note instance
315        Ok(ProfileRecord::new_transactional(
316            profile_record_ptr,
317            len,
318            ProfileKey::new(primkey),
319            transaction,
320        ))
321    }
322
323    pub fn get_note_metadata<'a>(
324        &self,
325        txn: &'a Transaction,
326        id: &[u8; 32],
327    ) -> Result<NoteMetadata<'a>> {
328        let res = unsafe {
329            let res = bindings::ndb_get_note_meta(
330                txn.as_mut_ptr(),
331                id.as_ptr() as *const ::std::os::raw::c_uchar,
332            );
333
334            if res.is_null() {
335                return Err(Error::NotFound);
336            }
337
338            &mut *res
339        };
340
341        Ok(NoteMetadata::new(res))
342    }
343
344    pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<NoteKey> {
345        let res = unsafe {
346            bindings::ndb_get_notekey_by_id(
347                txn.as_mut_ptr(),
348                id.as_ptr() as *const ::std::os::raw::c_uchar,
349            )
350        };
351
352        if res == 0 {
353            return Err(Error::NotFound);
354        }
355
356        Ok(NoteKey::new(res))
357    }
358
359    pub fn get_profilekey_by_pubkey(
360        &self,
361        txn: &Transaction,
362        pubkey: &[u8; 32],
363    ) -> Result<ProfileKey> {
364        let res = unsafe {
365            bindings::ndb_get_profilekey_by_pubkey(
366                txn.as_mut_ptr(),
367                pubkey.as_ptr() as *const ::std::os::raw::c_uchar,
368            )
369        };
370
371        if res == 0 {
372            return Err(Error::NotFound);
373        }
374
375        Ok(ProfileKey::new(res))
376    }
377
378    pub fn get_blocks_by_key<'a>(
379        &self,
380        txn: &'a Transaction,
381        note_key: NoteKey,
382    ) -> Result<Blocks<'a>> {
383        let blocks_ptr = unsafe {
384            bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64())
385        };
386
387        if blocks_ptr.is_null() {
388            return Err(Error::NotFound);
389        }
390
391        Ok(Blocks::new_transactional(blocks_ptr, txn))
392    }
393
394    pub fn get_note_by_key<'a>(
395        &self,
396        transaction: &'a Transaction,
397        note_key: NoteKey,
398    ) -> Result<Note<'a>> {
399        let mut len: usize = 0;
400
401        let note_ptr = unsafe {
402            bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len)
403        };
404
405        if note_ptr.is_null() {
406            // Handle null pointer (e.g., note not found or error occurred)
407            return Err(Error::NotFound);
408        }
409
410        // Convert the raw pointer to a Note instance
411        Ok(Note::new_transactional(
412            note_ptr,
413            len,
414            note_key,
415            transaction,
416        ))
417    }
418
419    /// Get a note from the database. Takes a [Transaction] and a 32-byte [Note] Id
420    pub fn get_note_by_id<'a>(
421        &self,
422        transaction: &'a Transaction,
423        id: &[u8; 32],
424    ) -> Result<Note<'a>> {
425        let mut len: usize = 0;
426        let mut primkey: u64 = 0;
427
428        let note_ptr = unsafe {
429            bindings::ndb_get_note_by_id(
430                transaction.as_mut_ptr(),
431                id.as_ptr(),
432                &mut len,
433                &mut primkey,
434            )
435        };
436
437        if note_ptr.is_null() {
438            // Handle null pointer (e.g., note not found or error occurred)
439            return Err(Error::NotFound);
440        }
441
442        // Convert the raw pointer to a Note instance
443        Ok(Note::new_transactional(
444            note_ptr,
445            len,
446            NoteKey::new(primkey),
447            transaction,
448        ))
449    }
450
451    pub fn search_profile<'a>(
452        &self,
453        transaction: &'a Transaction,
454        search: &str,
455        limit: u32,
456    ) -> Result<Vec<&'a [u8; 32]>> {
457        let mut results = Vec::new();
458
459        let mut ndb_search = ndb_search {
460            key: std::ptr::null_mut(),
461            profile_key: 0,
462            cursor: std::ptr::null_mut(),
463        };
464
465        let c_query = CString::new(search).map_err(|_| Error::DecodeError)?;
466
467        let success = unsafe {
468            bindings::ndb_search_profile(
469                transaction.as_mut_ptr(),
470                &mut ndb_search as *mut ndb_search,
471                c_query.as_c_str().as_ptr(),
472            )
473        };
474
475        if success == 0 {
476            return Ok(results);
477        }
478
479        // Add the first result
480        if let Some(key) = unsafe { ndb_search.key.as_ref() } {
481            results.push(&key.id);
482        }
483
484        // Iterate through additional results up to the limit
485        let mut remaining = limit;
486        while remaining > 0 {
487            let next_success =
488                unsafe { bindings::ndb_search_profile_next(&mut ndb_search as *mut ndb_search) };
489
490            if next_success == 0 {
491                break;
492            }
493
494            if let Some(key) = unsafe { ndb_search.key.as_ref() } {
495                results.push(&key.id);
496            }
497
498            remaining -= 1;
499        }
500
501        unsafe {
502            bindings::ndb_search_profile_end(&mut ndb_search as *mut ndb_search);
503        }
504
505        Ok(results)
506    }
507
508    /// Get the underlying pointer to the context in C
509    pub fn as_ptr(&self) -> *mut bindings::ndb {
510        self.refs.ndb
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517    use crate::config::Config;
518    use crate::test_util;
519    use tokio::time::{self, sleep, Duration};
520
521    #[test]
522    fn ndb_init_works() {
523        let db = "target/testdbs/init_works";
524        test_util::cleanup_db(db);
525
526        {
527            let cfg = Config::new();
528            let _ = Ndb::new(db, &cfg).expect("ok");
529        }
530    }
531
532    #[tokio::test]
533    async fn query_works() {
534        let db = "target/testdbs/query";
535        test_util::cleanup_db(&db);
536
537        {
538            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
539
540            let filter = Filter::new().kinds(vec![1]).build();
541            let filters = vec![filter];
542
543            let sub = ndb.subscribe(&filters).expect("sub_id");
544            let waiter = ndb.wait_for_notes(sub, 1);
545            ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
546            let res = waiter.await.expect("await ok");
547            assert_eq!(res, vec![NoteKey::new(1)]);
548            let txn = Transaction::new(&ndb).expect("txn");
549            let res = ndb.query(&txn, &filters, 1).expect("query ok");
550            assert_eq!(res.len(), 1);
551            assert_eq!(
552                hex::encode(res[0].note.id()),
553                "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3"
554            );
555        }
556    }
557
558    #[tokio::test]
559    async fn search_profile_works() {
560        let db = "target/testdbs/search_profile";
561        test_util::cleanup_db(&db);
562
563        {
564            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
565
566            let filter = Filter::new().kinds(vec![0]).build();
567            let filters = vec![filter];
568
569            let sub_id = ndb.subscribe(&filters).expect("sub_id");
570            let mut sub = sub_id.stream(&ndb).notes_per_await(1);
571
572            ndb.process_event(r#"["EVENT","b",{  "id": "0b9f0e14727733e430dcb00c69b12a76a1e100f419ce369df837f7eb33e4523c",  "pubkey": "3f770d65d3a764a9c5cb503ae123e62ec7598ad035d836e2a810f3877a745b24",  "created_at": 1736785355,  "kind": 0,  "tags": [    [      "alt",      "User profile for Derek Ross"    ],    [      "i",      "twitter:derekmross",      "1634343988407726081"    ],    [      "i",      "github:derekross",      "3edaf845975fa4500496a15039323fa3I"    ]  ],  "content": "{\"about\":\"Building NostrPlebs.com and NostrNests.com. The purple pill helps the orange pill go down. Nostr is the social glue that binds all of your apps together.\",\"banner\":\"https://i.nostr.build/O2JE.jpg\",\"display_name\":\"Derek Ross\",\"lud16\":\"derekross@strike.me\",\"name\":\"Derek Ross\",\"nip05\":\"derekross@nostrplebs.com\",\"picture\":\"https://i.nostr.build/MVIJ6OOFSUzzjVEc.jpg\",\"website\":\"https://nostrplebs.com\",\"created_at\":1707238393}",  "sig": "51e1225ccaf9b6739861dc218ac29045b09d5cf3a51b0ac6ea64bd36827d2d4394244e5f58a4e4a324c84eeda060e1a27e267e0d536e5a0e45b0b6bdc2c43bbc"}]"#).unwrap();
573
574            ndb.process_event(r#"["EVENT","b",{  "id": "232a02ec7e1b2febf85370b52ed49bf34e2701c385c3d563511508dcf0767bcf",  "pubkey": "4a0510f26880d40e432f4865cb5714d9d3c200ca6ebb16b418ae6c555f574967",  "created_at": 1736017863,  "kind": 0,  "tags": [    [      "client",      "Damus Notedeck"    ]  ],  "content": "{\"display_name\":\"KernelKind\",\"name\":\"KernelKind\",\"about\":\"hello from notedeck!\",\"lud16\":\"kernelkind@getalby.com\"}",  "sig": "18c7dea0da3c30677d6822a31a6dfd9ebc02a18a31d69f0f2ac9ba88409e437d3db0ac433639111df1e4948a6d18451d1582173ee4fcd018d0ec92939f2c1506"}]"#).unwrap();
575
576            for _ in 0..2 {
577                let _ = sub.next().await;
578            }
579            let txn = Transaction::new(&ndb).expect("txn");
580
581            let res = ndb.search_profile(&txn, "kernel", 1);
582            assert!(res.is_ok());
583            let res = res.unwrap();
584            assert!(res.len() >= 1);
585            let kernelkind_bytes: [u8; 32] = [
586                0x4a, 0x05, 0x10, 0xf2, 0x68, 0x80, 0xd4, 0x0e, 0x43, 0x2f, 0x48, 0x65, 0xcb, 0x57,
587                0x14, 0xd9, 0xd3, 0xc2, 0x00, 0xca, 0x6e, 0xbb, 0x16, 0xb4, 0x18, 0xae, 0x6c, 0x55,
588                0x5f, 0x57, 0x49, 0x67,
589            ];
590            assert_eq!(kernelkind_bytes, **res.first().unwrap());
591
592            let res = ndb.search_profile(&txn, "Derek", 1);
593            assert!(res.is_ok());
594            let res = res.unwrap();
595            assert!(res.len() >= 1);
596            let derek_bytes: [u8; 32] = [
597                0x3f, 0x77, 0x0d, 0x65, 0xd3, 0xa7, 0x64, 0xa9, 0xc5, 0xcb, 0x50, 0x3a, 0xe1, 0x23,
598                0xe6, 0x2e, 0xc7, 0x59, 0x8a, 0xd0, 0x35, 0xd8, 0x36, 0xe2, 0xa8, 0x10, 0xf3, 0x87,
599                0x7a, 0x74, 0x5b, 0x24,
600            ];
601            assert_eq!(derek_bytes, **res.first().unwrap());
602        }
603    }
604
605    #[tokio::test]
606    async fn subscribe_event_works() {
607        let db = "target/testdbs/subscribe";
608        test_util::cleanup_db(&db);
609
610        {
611            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
612
613            let filter = Filter::new().kinds(vec![1]).build();
614
615            let sub = ndb.subscribe(&[filter]).expect("sub_id");
616            let waiter = ndb.wait_for_notes(sub, 1);
617            ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
618            let res = waiter.await.expect("await ok");
619            assert_eq!(res, vec![NoteKey::new(1)]);
620        }
621    }
622
623    #[tokio::test]
624    async fn multiple_events_work() {
625        let db = "target/testdbs/multiple_events";
626        test_util::cleanup_db(&db);
627
628        {
629            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
630
631            let filter = Filter::new().kinds(vec![1]).build();
632
633            let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
634            let mut sub = sub_id.stream(&ndb).notes_per_await(1);
635
636            ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
637            ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
638            ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
639
640            // this pause causes problems
641            sleep(Duration::from_millis(100)).await;
642
643            ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
644            ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
645            ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
646
647            let timeout_duration = Duration::from_secs(2);
648            let result = time::timeout(timeout_duration, async {
649                let mut count = 0;
650                while count < 6 {
651                    let res = sub.next();
652                    let _ = res.await.expect("await ok");
653                    count += 1;
654                    println!("saw an event, count = {}", count);
655                }
656            })
657            .await;
658
659            match result {
660                Ok(_) => println!("Test completed successfully"),
661                Err(_) => panic!("Test timed out"),
662            }
663        }
664    }
665
666    #[tokio::test]
667    async fn multiple_events_with_final_pause_work() {
668        let db = "target/testdbs/multiple_events_with_final_pause";
669        test_util::cleanup_db(&db);
670
671        {
672            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
673
674            let filter = Filter::new().kinds(vec![1]).build();
675
676            let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
677            let mut sub = sub_id.stream(&ndb).notes_per_await(1);
678
679            ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
680            ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
681            ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
682
683            sleep(Duration::from_millis(100)).await;
684
685            ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
686            ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
687            ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
688
689            // this final pause causes extra problems
690            sleep(Duration::from_millis(100)).await;
691
692            let timeout_duration = Duration::from_secs(2);
693            let result = time::timeout(timeout_duration, async {
694                let mut count = 0;
695                while count < 6 {
696                    let res = sub.next();
697                    let _ = res.await.expect("await ok");
698                    count += 1;
699                    println!("saw an event, count = {}", count);
700                }
701            })
702            .await;
703
704            match result {
705                Ok(_) => println!("Test completed successfully"),
706                Err(_) => panic!("Test timed out"),
707            }
708        }
709    }
710
711    #[test]
712    fn poll_note_works() {
713        let db = "target/testdbs/poll";
714        test_util::cleanup_db(&db);
715
716        {
717            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
718
719            let filter = Filter::new().kinds(vec![1]).build();
720
721            let sub = ndb.subscribe(&[filter]).expect("sub_id");
722            ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
723            // this is too fast, we should have nothing
724            let res = ndb.poll_for_notes(sub, 1);
725            assert_eq!(res, vec![]);
726
727            std::thread::sleep(std::time::Duration::from_millis(150));
728            // now we should have something
729            let res = ndb.poll_for_notes(sub, 1);
730            assert_eq!(res, vec![NoteKey::new(1)]);
731        }
732    }
733
734    #[test]
735    fn process_event_works() {
736        let db = "target/testdbs/event_works";
737        test_util::cleanup_db(&db);
738
739        {
740            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
741            ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
742        }
743
744        {
745            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
746            let id =
747                hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
748                    .expect("hex id");
749            let mut txn = Transaction::new(&ndb).expect("txn");
750            let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
751            let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
752            assert_eq!(note.kind(), 1);
753        }
754    }
755
756    #[test]
757    #[cfg(target_os = "windows")]
758    fn test_windows_large_mapsize() {
759        use std::{fs, path::Path};
760
761        let db = "target/testdbs/windows_large_mapsize";
762        test_util::cleanup_db(&db);
763
764        {
765            // 32 TiB should be way too big for CI
766            let config =
767                Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize);
768
769            // in this case, nostrdb should try to keep resizing to
770            // smaller mapsizes until success
771
772            let ndb = Ndb::new(db, &config);
773
774            assert!(ndb.is_ok());
775        }
776
777        let file_len = fs::metadata(Path::new(db).join("data.mdb"))
778            .expect("metadata")
779            .len();
780
781        assert!(file_len > 0);
782
783        if cfg!(target_os = "windows") {
784            // on windows the default mapsize will be 1MB when we fail
785            // to open it
786            assert_ne!(file_len, 1048576);
787        } else {
788            assert!(file_len < 1024u64 * 1024u64);
789        }
790
791        // we should definitely clean this up... especially on windows
792        test_util::cleanup_db(&db);
793    }
794
795    #[tokio::test]
796    async fn test_unsub_on_drop() {
797        let db = "target/testdbs/test_unsub_on_drop";
798        test_util::cleanup_db(&db);
799
800        {
801            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
802            let sub_id = {
803                let filter = Filter::new().kinds(vec![1]).build();
804                let filters = vec![filter];
805
806                let sub_id = ndb.subscribe(&filters).expect("sub_id");
807                let mut sub = sub_id.stream(&ndb).notes_per_await(1);
808
809                let res = sub.next();
810
811                ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
812
813                let res = res.await.expect("await ok");
814                assert_eq!(res, vec![NoteKey::new(1)]);
815
816                assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
817                sub_id
818            };
819
820            // ensure subscription state is removed after stream is dropped
821            assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
822            assert_eq!(ndb.subscription_count(), 0);
823        }
824
825        test_util::cleanup_db(&db);
826    }
827
828    #[tokio::test]
829    async fn test_stream() {
830        let db = "target/testdbs/test_stream";
831        test_util::cleanup_db(&db);
832
833        {
834            let mut ndb = Ndb::new(db, &Config::new()).expect("ndb");
835            let sub_id = {
836                let filter = Filter::new().kinds(vec![1]).build();
837                let filters = vec![filter];
838
839                let sub_id = ndb.subscribe(&filters).expect("sub_id");
840                let mut sub = sub_id.stream(&ndb).notes_per_await(1);
841
842                let res = sub.next();
843
844                let _ = ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
845
846                let _ = ndb.process_event(r#"["EVENT",{"content":"👀","created_at":1761514455,"id":"66af95a6bdfec756344f48241562b684082ff9c76ea940c11c4fd85e91e1219c","kind":7,"pubkey":"d5805ae449e108e907091c67cdf49a9835b3cac3dd11489ad215c0ddf7c658fc","sig":"69f4a3fe7c1cc6aa9c9cc4a2e90e4b71c3b9afaad262e68b92336e0493ff1a748b5dcc20ab6e86d4551dc5ea680ddfa1c08d47f9e4845927e143e8ef2183479b","tags":[["e","d44ad96cb8924092a76bc2afddeb12eb85233c0d03a7d9adc42c2a85a79a4305","wss://relay.primal.net/","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"],["p","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9","wss://relay.primal.net/"],["k","1"]]}]"#);
847                let res = res.await.expect("await ok");
848                assert_eq!(res, vec![NoteKey::new(1)]);
849
850                // ensure that unsubscribing kills the stream
851                assert!(ndb.unsubscribe(sub_id).is_ok());
852                assert!(sub.next().await.is_none());
853
854                assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
855                sub_id
856            };
857
858            // ensure subscription state is removed after stream is dropped
859            assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
860        }
861
862        test_util::cleanup_db(&db);
863    }
864
865    #[test]
866    fn get_profile_by_missing_pubkey_returns_not_found() {
867        let db = "target/testdbs/missing_pubkey";
868        test_util::cleanup_db(&db);
869
870        {
871            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
872            let txn = Transaction::new(&ndb).expect("txn");
873
874            // Unknown pubkey that doesn't exist in DB
875            let unknown_pubkey: [u8; 32] = [
876                0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
877                0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
878                0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
879                0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
880            ];
881
882            let result = ndb.get_profile_by_pubkey(&txn, &unknown_pubkey);
883            assert!(matches!(result, Err(Error::NotFound)));
884        }
885
886        test_util::cleanup_db(&db);
887    }
888
889    #[test]
890    fn get_profilekey_by_missing_pubkey_returns_not_found() {
891        let db = "target/testdbs/missing_profilekey";
892        test_util::cleanup_db(&db);
893
894        {
895            let ndb = Ndb::new(db, &Config::new()).expect("ndb");
896            let txn = Transaction::new(&ndb).expect("txn");
897
898            // Unknown pubkey that doesn't exist in DB
899            let unknown_pubkey: [u8; 32] = [0x00; 32];
900
901            let result = ndb.get_profilekey_by_pubkey(&txn, &unknown_pubkey);
902            assert!(matches!(result, Err(Error::NotFound)));
903        }
904
905        test_util::cleanup_db(&db);
906    }
907}