1use std::ffi::CString;
2use std::ptr;
3
4use crate::bindings::ndb_search;
5use crate::{
6 bindings, Blocks, Config, Error, Filter, IngestMetadata, Note, NoteKey, NoteMetadata,
7 ProfileKey, ProfileRecord, QueryResult, Result, Subscription, SubscriptionState,
8 SubscriptionStream, Transaction,
9};
10use futures::StreamExt;
11use std::collections::hash_map::Entry;
12use std::collections::HashMap;
13use std::fs;
14use std::os::raw::c_int;
15use std::path::Path;
16use std::sync::{Arc, Mutex};
17use tracing::debug;
18
19#[derive(Debug)]
20struct NdbRef {
21 ndb: *mut bindings::ndb,
22 rust_cb_ctx: *mut ::std::os::raw::c_void,
23}
24
25unsafe impl Send for NdbRef {}
27
28unsafe impl Sync for NdbRef {}
30
31impl Drop for NdbRef {
33 fn drop(&mut self) {
34 unsafe {
35 bindings::ndb_destroy(self.ndb);
36
37 if !self.rust_cb_ctx.is_null() {
38 let _ = Box::from_raw(self.rust_cb_ctx as *mut Box<dyn FnMut()>);
40 }
41 }
42 }
43}
44
45type SubMap = HashMap<Subscription, SubscriptionState>;
46
47#[derive(Debug, Clone)]
49pub struct Ndb {
50 refs: Arc<NdbRef>,
51
52 pub(crate) subs: Arc<Mutex<SubMap>>,
54}
55
56impl Ndb {
57 pub fn new(db_dir: &str, config: &Config) -> Result<Self> {
60 let db_dir_cstr = match CString::new(db_dir) {
61 Ok(cstr) => cstr,
62 Err(_) => return Err(Error::DbOpenFailed),
63 };
64 let mut ndb: *mut bindings::ndb = ptr::null_mut();
65
66 let path = Path::new(db_dir);
67 if !path.exists() {
68 let _ = fs::create_dir_all(path);
69 }
70
71 let min_mapsize = 1024 * 1024 * 512;
72 let mut mapsize = config.config.mapsize;
73 let config = *config;
74
75 let prev_callback = config.config.sub_cb;
76 let prev_callback_ctx = config.config.sub_cb_ctx;
77 let subs = Arc::new(Mutex::new(SubMap::default()));
78 let subs_clone = subs.clone();
79
80 let mut config = config.set_sub_callback(move |sub_id: u64| {
83 let mut map = subs_clone.lock().unwrap();
84 if let Some(s) = map.get_mut(&Subscription::new(sub_id)) {
85 if let Some(w) = s.waker.take() {
86 w.wake();
87 }
88 }
89
90 if let Some(pcb) = prev_callback {
91 unsafe {
92 pcb(prev_callback_ctx, sub_id);
93 };
94 }
95 });
96
97 let result = loop {
98 let result =
99 unsafe { bindings::ndb_init(&mut ndb, db_dir_cstr.as_ptr(), config.as_ptr()) };
100
101 if result == 0 {
102 mapsize /= 2;
103 config = config.set_mapsize(mapsize);
104 debug!("ndb init failed, reducing mapsize to {}", mapsize);
105
106 if mapsize > min_mapsize {
107 continue;
108 } else {
109 break 0;
110 }
111 } else {
112 break result;
113 }
114 };
115
116 if result == 0 {
117 return Err(Error::DbOpenFailed);
118 }
119
120 let rust_cb_ctx = config.config.sub_cb_ctx;
121 let refs = Arc::new(NdbRef { ndb, rust_cb_ctx });
122
123 Ok(Ndb { refs, subs })
124 }
125
126 pub fn process_event_with(&self, json: &str, mut meta: IngestMetadata) -> Result<()> {
130 let c_json = CString::new(json)?;
132 let c_json_ptr = c_json.as_ptr();
133
134 let len = json.len() as libc::c_int;
136
137 let res = unsafe {
138 bindings::ndb_process_event_with(self.as_ptr(), c_json_ptr, len, meta.as_mut_ptr())
139 };
140
141 if res == 0 {
142 return Err(Error::NoteProcessFailed);
143 }
144
145 Ok(())
146 }
147
148 pub fn process_event(&self, json: &str) -> Result<()> {
152 self.process_event_with(json, IngestMetadata::new().client(false))
153 }
154
155 pub fn process_client_event(&self, json: &str) -> Result<()> {
159 self.process_event_with(json, IngestMetadata::new().client(true))
160 }
161
162 pub fn query<'a>(
163 &self,
164 txn: &'a Transaction,
165 filters: &[Filter],
166 max_results: i32,
167 ) -> Result<Vec<QueryResult<'a>>> {
168 let mut ndb_filters: Vec<bindings::ndb_filter> = filters.iter().map(|a| a.data).collect();
169 let mut out: Vec<bindings::ndb_query_result> = vec![];
170 let mut returned: i32 = 0;
171 out.reserve_exact(max_results as usize);
172 let res = unsafe {
173 bindings::ndb_query(
174 txn.as_mut_ptr(),
175 ndb_filters.as_mut_ptr(),
176 ndb_filters.len() as i32,
177 out.as_mut_ptr(),
178 max_results,
179 &mut returned as *mut i32,
180 )
181 };
182 if res == 1 {
183 unsafe {
184 out.set_len(returned as usize);
185 };
186 Ok(out.iter().map(|r| QueryResult::new(r, txn)).collect())
187 } else {
188 Err(Error::QueryError)
189 }
190 }
191
192 pub fn subscription_count(&self) -> u32 {
193 unsafe { bindings::ndb_num_subscriptions(self.as_ptr()) as u32 }
194 }
195
196 pub fn unsubscribe(&mut self, sub: Subscription) -> Result<()> {
197 let r = unsafe { bindings::ndb_unsubscribe(self.as_ptr(), sub.id()) };
198
199 debug!(
200 "unsubscribed from {}, sub count {}",
201 sub.id(),
202 self.subscription_count()
203 );
204
205 {
207 let mut map = self.subs.lock().unwrap();
208 if let Entry::Occupied(mut entry) = map.entry(sub) {
209 entry.get_mut().done = true;
210 }
211 }
212
213 if r == 0 {
214 Err(Error::SubscriptionError)
215 } else {
216 Ok(())
217 }
218 }
219
220 pub fn subscribe(&self, filters: &[Filter]) -> Result<Subscription> {
221 unsafe {
222 let mut ndb_filters: Vec<bindings::ndb_filter> =
223 filters.iter().map(|a| a.data).collect();
224 let id = bindings::ndb_subscribe(
225 self.as_ptr(),
226 ndb_filters.as_mut_ptr(),
227 filters.len() as i32,
228 );
229 if id == 0 {
230 Err(Error::SubscriptionError)
231 } else {
232 Ok(Subscription::new(id))
233 }
234 }
235 }
236
237 pub fn poll_for_notes(&self, sub: Subscription, max_notes: u32) -> Vec<NoteKey> {
238 let mut vec = vec![];
239 vec.reserve_exact(max_notes as usize);
240
241 unsafe {
242 let res = bindings::ndb_poll_for_notes(
243 self.as_ptr(),
244 sub.id(),
245 vec.as_mut_ptr(),
246 max_notes as c_int,
247 );
248 vec.set_len(res as usize);
249 };
250
251 vec.into_iter().map(NoteKey::new).collect()
252 }
253
254 pub async fn wait_for_notes(
255 &self,
256 sub_id: Subscription,
257 max_notes: u32,
258 ) -> Result<Vec<NoteKey>> {
259 let mut stream = SubscriptionStream::new(self.clone(), sub_id).notes_per_await(max_notes);
260
261 match stream.next().await {
262 Some(res) => Ok(res),
263 None => Err(Error::SubscriptionError),
264 }
265 }
266
267 pub fn get_profile_by_key<'a>(
268 &self,
269 transaction: &'a Transaction,
270 key: ProfileKey,
271 ) -> Result<ProfileRecord<'a>> {
272 let mut len: usize = 0;
273
274 let profile_record_ptr = unsafe {
275 bindings::ndb_get_profile_by_key(transaction.as_mut_ptr(), key.as_u64(), &mut len)
276 };
277
278 if profile_record_ptr.is_null() {
279 return Err(Error::NotFound);
281 }
282
283 Ok(ProfileRecord::new_transactional(
285 profile_record_ptr,
286 len,
287 key,
288 transaction,
289 ))
290 }
291
292 pub fn get_profile_by_pubkey<'a>(
293 &self,
294 transaction: &'a Transaction,
295 id: &[u8; 32],
296 ) -> Result<ProfileRecord<'a>> {
297 let mut len: usize = 0;
298 let mut primkey: u64 = 0;
299
300 let profile_record_ptr = unsafe {
301 bindings::ndb_get_profile_by_pubkey(
302 transaction.as_mut_ptr(),
303 id.as_ptr(),
304 &mut len,
305 &mut primkey,
306 )
307 };
308
309 if profile_record_ptr.is_null() {
310 return Err(Error::NotFound);
312 }
313
314 Ok(ProfileRecord::new_transactional(
316 profile_record_ptr,
317 len,
318 ProfileKey::new(primkey),
319 transaction,
320 ))
321 }
322
323 pub fn get_note_metadata<'a>(
324 &self,
325 txn: &'a Transaction,
326 id: &[u8; 32],
327 ) -> Result<NoteMetadata<'a>> {
328 let res = unsafe {
329 let res = bindings::ndb_get_note_meta(
330 txn.as_mut_ptr(),
331 id.as_ptr() as *const ::std::os::raw::c_uchar,
332 );
333
334 if res.is_null() {
335 return Err(Error::NotFound);
336 }
337
338 &mut *res
339 };
340
341 Ok(NoteMetadata::new(res))
342 }
343
344 pub fn get_notekey_by_id(&self, txn: &Transaction, id: &[u8; 32]) -> Result<NoteKey> {
345 let res = unsafe {
346 bindings::ndb_get_notekey_by_id(
347 txn.as_mut_ptr(),
348 id.as_ptr() as *const ::std::os::raw::c_uchar,
349 )
350 };
351
352 if res == 0 {
353 return Err(Error::NotFound);
354 }
355
356 Ok(NoteKey::new(res))
357 }
358
359 pub fn get_profilekey_by_pubkey(
360 &self,
361 txn: &Transaction,
362 pubkey: &[u8; 32],
363 ) -> Result<ProfileKey> {
364 let res = unsafe {
365 bindings::ndb_get_profilekey_by_pubkey(
366 txn.as_mut_ptr(),
367 pubkey.as_ptr() as *const ::std::os::raw::c_uchar,
368 )
369 };
370
371 if res == 0 {
372 return Err(Error::NotFound);
373 }
374
375 Ok(ProfileKey::new(res))
376 }
377
378 pub fn get_blocks_by_key<'a>(
379 &self,
380 txn: &'a Transaction,
381 note_key: NoteKey,
382 ) -> Result<Blocks<'a>> {
383 let blocks_ptr = unsafe {
384 bindings::ndb_get_blocks_by_key(self.as_ptr(), txn.as_mut_ptr(), note_key.as_u64())
385 };
386
387 if blocks_ptr.is_null() {
388 return Err(Error::NotFound);
389 }
390
391 Ok(Blocks::new_transactional(blocks_ptr, txn))
392 }
393
394 pub fn get_note_by_key<'a>(
395 &self,
396 transaction: &'a Transaction,
397 note_key: NoteKey,
398 ) -> Result<Note<'a>> {
399 let mut len: usize = 0;
400
401 let note_ptr = unsafe {
402 bindings::ndb_get_note_by_key(transaction.as_mut_ptr(), note_key.as_u64(), &mut len)
403 };
404
405 if note_ptr.is_null() {
406 return Err(Error::NotFound);
408 }
409
410 Ok(Note::new_transactional(
412 note_ptr,
413 len,
414 note_key,
415 transaction,
416 ))
417 }
418
419 pub fn get_note_by_id<'a>(
421 &self,
422 transaction: &'a Transaction,
423 id: &[u8; 32],
424 ) -> Result<Note<'a>> {
425 let mut len: usize = 0;
426 let mut primkey: u64 = 0;
427
428 let note_ptr = unsafe {
429 bindings::ndb_get_note_by_id(
430 transaction.as_mut_ptr(),
431 id.as_ptr(),
432 &mut len,
433 &mut primkey,
434 )
435 };
436
437 if note_ptr.is_null() {
438 return Err(Error::NotFound);
440 }
441
442 Ok(Note::new_transactional(
444 note_ptr,
445 len,
446 NoteKey::new(primkey),
447 transaction,
448 ))
449 }
450
451 pub fn search_profile<'a>(
452 &self,
453 transaction: &'a Transaction,
454 search: &str,
455 limit: u32,
456 ) -> Result<Vec<&'a [u8; 32]>> {
457 let mut results = Vec::new();
458
459 let mut ndb_search = ndb_search {
460 key: std::ptr::null_mut(),
461 profile_key: 0,
462 cursor: std::ptr::null_mut(),
463 };
464
465 let c_query = CString::new(search).map_err(|_| Error::DecodeError)?;
466
467 let success = unsafe {
468 bindings::ndb_search_profile(
469 transaction.as_mut_ptr(),
470 &mut ndb_search as *mut ndb_search,
471 c_query.as_c_str().as_ptr(),
472 )
473 };
474
475 if success == 0 {
476 return Ok(results);
477 }
478
479 if let Some(key) = unsafe { ndb_search.key.as_ref() } {
481 results.push(&key.id);
482 }
483
484 let mut remaining = limit;
486 while remaining > 0 {
487 let next_success =
488 unsafe { bindings::ndb_search_profile_next(&mut ndb_search as *mut ndb_search) };
489
490 if next_success == 0 {
491 break;
492 }
493
494 if let Some(key) = unsafe { ndb_search.key.as_ref() } {
495 results.push(&key.id);
496 }
497
498 remaining -= 1;
499 }
500
501 unsafe {
502 bindings::ndb_search_profile_end(&mut ndb_search as *mut ndb_search);
503 }
504
505 Ok(results)
506 }
507
508 pub fn as_ptr(&self) -> *mut bindings::ndb {
510 self.refs.ndb
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517 use crate::config::Config;
518 use crate::test_util;
519 use tokio::time::{self, sleep, Duration};
520
521 #[test]
522 fn ndb_init_works() {
523 let db = "target/testdbs/init_works";
524 test_util::cleanup_db(db);
525
526 {
527 let cfg = Config::new();
528 let _ = Ndb::new(db, &cfg).expect("ok");
529 }
530 }
531
532 #[tokio::test]
533 async fn query_works() {
534 let db = "target/testdbs/query";
535 test_util::cleanup_db(&db);
536
537 {
538 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
539
540 let filter = Filter::new().kinds(vec![1]).build();
541 let filters = vec![filter];
542
543 let sub = ndb.subscribe(&filters).expect("sub_id");
544 let waiter = ndb.wait_for_notes(sub, 1);
545 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
546 let res = waiter.await.expect("await ok");
547 assert_eq!(res, vec![NoteKey::new(1)]);
548 let txn = Transaction::new(&ndb).expect("txn");
549 let res = ndb.query(&txn, &filters, 1).expect("query ok");
550 assert_eq!(res.len(), 1);
551 assert_eq!(
552 hex::encode(res[0].note.id()),
553 "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3"
554 );
555 }
556 }
557
558 #[tokio::test]
559 async fn search_profile_works() {
560 let db = "target/testdbs/search_profile";
561 test_util::cleanup_db(&db);
562
563 {
564 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
565
566 let filter = Filter::new().kinds(vec![0]).build();
567 let filters = vec![filter];
568
569 let sub_id = ndb.subscribe(&filters).expect("sub_id");
570 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
571
572 ndb.process_event(r#"["EVENT","b",{ "id": "0b9f0e14727733e430dcb00c69b12a76a1e100f419ce369df837f7eb33e4523c", "pubkey": "3f770d65d3a764a9c5cb503ae123e62ec7598ad035d836e2a810f3877a745b24", "created_at": 1736785355, "kind": 0, "tags": [ [ "alt", "User profile for Derek Ross" ], [ "i", "twitter:derekmross", "1634343988407726081" ], [ "i", "github:derekross", "3edaf845975fa4500496a15039323fa3I" ] ], "content": "{\"about\":\"Building NostrPlebs.com and NostrNests.com. The purple pill helps the orange pill go down. Nostr is the social glue that binds all of your apps together.\",\"banner\":\"https://i.nostr.build/O2JE.jpg\",\"display_name\":\"Derek Ross\",\"lud16\":\"derekross@strike.me\",\"name\":\"Derek Ross\",\"nip05\":\"derekross@nostrplebs.com\",\"picture\":\"https://i.nostr.build/MVIJ6OOFSUzzjVEc.jpg\",\"website\":\"https://nostrplebs.com\",\"created_at\":1707238393}", "sig": "51e1225ccaf9b6739861dc218ac29045b09d5cf3a51b0ac6ea64bd36827d2d4394244e5f58a4e4a324c84eeda060e1a27e267e0d536e5a0e45b0b6bdc2c43bbc"}]"#).unwrap();
573
574 ndb.process_event(r#"["EVENT","b",{ "id": "232a02ec7e1b2febf85370b52ed49bf34e2701c385c3d563511508dcf0767bcf", "pubkey": "4a0510f26880d40e432f4865cb5714d9d3c200ca6ebb16b418ae6c555f574967", "created_at": 1736017863, "kind": 0, "tags": [ [ "client", "Damus Notedeck" ] ], "content": "{\"display_name\":\"KernelKind\",\"name\":\"KernelKind\",\"about\":\"hello from notedeck!\",\"lud16\":\"kernelkind@getalby.com\"}", "sig": "18c7dea0da3c30677d6822a31a6dfd9ebc02a18a31d69f0f2ac9ba88409e437d3db0ac433639111df1e4948a6d18451d1582173ee4fcd018d0ec92939f2c1506"}]"#).unwrap();
575
576 for _ in 0..2 {
577 let _ = sub.next().await;
578 }
579 let txn = Transaction::new(&ndb).expect("txn");
580
581 let res = ndb.search_profile(&txn, "kernel", 1);
582 assert!(res.is_ok());
583 let res = res.unwrap();
584 assert!(res.len() >= 1);
585 let kernelkind_bytes: [u8; 32] = [
586 0x4a, 0x05, 0x10, 0xf2, 0x68, 0x80, 0xd4, 0x0e, 0x43, 0x2f, 0x48, 0x65, 0xcb, 0x57,
587 0x14, 0xd9, 0xd3, 0xc2, 0x00, 0xca, 0x6e, 0xbb, 0x16, 0xb4, 0x18, 0xae, 0x6c, 0x55,
588 0x5f, 0x57, 0x49, 0x67,
589 ];
590 assert_eq!(kernelkind_bytes, **res.first().unwrap());
591
592 let res = ndb.search_profile(&txn, "Derek", 1);
593 assert!(res.is_ok());
594 let res = res.unwrap();
595 assert!(res.len() >= 1);
596 let derek_bytes: [u8; 32] = [
597 0x3f, 0x77, 0x0d, 0x65, 0xd3, 0xa7, 0x64, 0xa9, 0xc5, 0xcb, 0x50, 0x3a, 0xe1, 0x23,
598 0xe6, 0x2e, 0xc7, 0x59, 0x8a, 0xd0, 0x35, 0xd8, 0x36, 0xe2, 0xa8, 0x10, 0xf3, 0x87,
599 0x7a, 0x74, 0x5b, 0x24,
600 ];
601 assert_eq!(derek_bytes, **res.first().unwrap());
602 }
603 }
604
605 #[tokio::test]
606 async fn subscribe_event_works() {
607 let db = "target/testdbs/subscribe";
608 test_util::cleanup_db(&db);
609
610 {
611 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
612
613 let filter = Filter::new().kinds(vec![1]).build();
614
615 let sub = ndb.subscribe(&[filter]).expect("sub_id");
616 let waiter = ndb.wait_for_notes(sub, 1);
617 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
618 let res = waiter.await.expect("await ok");
619 assert_eq!(res, vec![NoteKey::new(1)]);
620 }
621 }
622
623 #[tokio::test]
624 async fn multiple_events_work() {
625 let db = "target/testdbs/multiple_events";
626 test_util::cleanup_db(&db);
627
628 {
629 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
630
631 let filter = Filter::new().kinds(vec![1]).build();
632
633 let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
634 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
635
636 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
637 ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
638 ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
639
640 sleep(Duration::from_millis(100)).await;
642
643 ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
644 ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
645 ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
646
647 let timeout_duration = Duration::from_secs(2);
648 let result = time::timeout(timeout_duration, async {
649 let mut count = 0;
650 while count < 6 {
651 let res = sub.next();
652 let _ = res.await.expect("await ok");
653 count += 1;
654 println!("saw an event, count = {}", count);
655 }
656 })
657 .await;
658
659 match result {
660 Ok(_) => println!("Test completed successfully"),
661 Err(_) => panic!("Test timed out"),
662 }
663 }
664 }
665
666 #[tokio::test]
667 async fn multiple_events_with_final_pause_work() {
668 let db = "target/testdbs/multiple_events_with_final_pause";
669 test_util::cleanup_db(&db);
670
671 {
672 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
673
674 let filter = Filter::new().kinds(vec![1]).build();
675
676 let sub_id = ndb.subscribe(&[filter]).expect("sub_id");
677 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
678
679 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
680 ndb.process_event(r#"["EVENT","b",{"id":"d379f55b520a9b2442556917e2cc7b7c16bfe3f4f08856dcc5735eadb2706267","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1720482500,"kind":1,"tags":[["p","5e7ae588d7d11eac4c25906e6da807e68c6498f49a38e4692be5a089616ceb18"]],"content":"@npub1teawtzxh6y02cnp9jphxm2q8u6xxfx85nguwg6ftuksgjctvavvqnsgq5u Verifying My Public Key: \"ksedgwic\"\n","sig":"3e8683490d951e0f5b3b59835063684d3d159322394d2aad3ee027890dcf8d9ff337027f07ec9c5f9799195466723bc459c67fbf3c902ad40a6b51bcb45d3feb"}]"#).expect("process ok");
681 ndb.process_event(r#"["EVENT","b",{"id":"8600bdc1f35ec4662b32609e93cc51a42e5ea9f6b8d656ca9d6b541310052885","pubkey":"dcdc0e77fe223f3f62a476578350133ca97767927df676ca7ca7b92a413a7703","created_at":1734636009,"kind":1,"tags":[],"content":"testing blocked pubkey","sig":"e8949493d81474085cd084d3b81e48b1673fcb2c738a9e7c130915fc85944e787885577b71be6a0822df10f7e823229417774d1e6a66e5cfac9d151f460a5291"}]"#).expect("process ok");
682
683 sleep(Duration::from_millis(100)).await;
684
685 ndb.process_event(r#"["EVENT","b",{"id":"e3ba832d4399528beb1c677a50d139c94e67220600dd424eb3ad3fa673a45dd5","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1735920949,"kind":1,"tags":[["e","83e37c70a84df8a9b1fe85df15fb892a3852f3a9acc8f9af34449772b1cb07f3","","root"],["e","a3ed05a377b1c1f460fa4e9c2dd393e9563dd2da6955d48287847278d1039277","","reply"],["p","37f2654c028c224b36507facf80c62d53b6c2eebb8d5590aa238d71d3c48723a"],["p","d4bad8c24d4bee499afb08830e71dd103e61e007556d20ba2ef3867fb57136de"],["r","https://meshtastic.org/docs/hardware/devices/"]],"content":"I think anything on this list that runs stock meshtastic should work. You do need a USB connection for the early proof of concept \nhttps://meshtastic.org/docs/hardware/devices/\n\nOthers might have better advice about which are the best though","sig":"85318ea5b83c3316063be82a6e45180767e9ea6b114d0a181dde7d4dc040f2c7f86f8750cc106b66bf666a4ac2debfd8b07c986b7814a715e3ea1cb42626cc68"}]"#).expect("process ok");
686 ndb.process_event(r#"["EVENT","b",{"id":"d7ba624865319e95f49c30f5d9644525ab2daaba4e503ecb125798ff038fef13","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1732839586,"kind":1,"tags":[["e","57f1ec61f29d01e2171089aaa86a43694e05ac68507ba7b540e1b968d14f45c2","","root"],["e","77e8e33005b7139901b7e3100eff1043ea4f1faa491c678e8ba9aa3b324011d1"],["e","6eb98593d806ba5fe0ab9aa0e50591af9bbbc7874401183daf59ce788a4bf79f","","reply"],["p","1fccce68f977187c91a7091ece205e214d436eeb8049bc72e266cf4f976d8f77"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"]],"content":"Works great on Fedora too","sig":"559ac1e852ddedd489fbfc600e4a69f1d182c57fb7dc89e0b3c385cb40ef6e4aff137a34da55b2504798171e957dd39bef57bd3bf946ee70e2eb4023bb446c8b"}]"#).expect("process ok");
687 ndb.process_event(r#"["EVENT","b",{"id":"242ae4cf1c719e2c4b656a3aac47c860b1a3ee7bf85c2317e660e27904438b08","pubkey":"850605096dbfb50b929e38a6c26c3d56c425325c85e05de29b759bc0e5d6cebc","created_at":1729652152,"kind":1,"tags":[["e","760f76e66e1046066f134367e2da93f1ac4c8d9d6b7b5e0b990c6725fe8d1442","","root"],["e","85575dbb1aeca2c7875e242351394d9c21ca0bc41946de069b267aeb9e672774","","reply"],["p","7c765d407d3a9d5ea117cb8b8699628560787fc084a0c76afaa449bfbd121d84"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"]],"content":"","sig":"3ab9c19640a2efb55510f9ac2e12117582bc94ef985fac33f6f4c6d8fecc3a4e83647a347772aad3cfb12a8ee91649b36feee7b66bc8b61d5232aca29afc4186"}]"#).expect("process ok");
688
689 sleep(Duration::from_millis(100)).await;
691
692 let timeout_duration = Duration::from_secs(2);
693 let result = time::timeout(timeout_duration, async {
694 let mut count = 0;
695 while count < 6 {
696 let res = sub.next();
697 let _ = res.await.expect("await ok");
698 count += 1;
699 println!("saw an event, count = {}", count);
700 }
701 })
702 .await;
703
704 match result {
705 Ok(_) => println!("Test completed successfully"),
706 Err(_) => panic!("Test timed out"),
707 }
708 }
709 }
710
711 #[test]
712 fn poll_note_works() {
713 let db = "target/testdbs/poll";
714 test_util::cleanup_db(&db);
715
716 {
717 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
718
719 let filter = Filter::new().kinds(vec![1]).build();
720
721 let sub = ndb.subscribe(&[filter]).expect("sub_id");
722 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
723 let res = ndb.poll_for_notes(sub, 1);
725 assert_eq!(res, vec![]);
726
727 std::thread::sleep(std::time::Duration::from_millis(150));
728 let res = ndb.poll_for_notes(sub, 1);
730 assert_eq!(res, vec![NoteKey::new(1)]);
731 }
732 }
733
734 #[test]
735 fn process_event_works() {
736 let db = "target/testdbs/event_works";
737 test_util::cleanup_db(&db);
738
739 {
740 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
741 ndb.process_event(r#"["EVENT","s",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
742 }
743
744 {
745 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
746 let id =
747 hex::decode("702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3")
748 .expect("hex id");
749 let mut txn = Transaction::new(&ndb).expect("txn");
750 let id_bytes: [u8; 32] = id.try_into().expect("id bytes");
751 let note = ndb.get_note_by_id(&mut txn, &id_bytes).expect("note");
752 assert_eq!(note.kind(), 1);
753 }
754 }
755
756 #[test]
757 #[cfg(target_os = "windows")]
758 fn test_windows_large_mapsize() {
759 use std::{fs, path::Path};
760
761 let db = "target/testdbs/windows_large_mapsize";
762 test_util::cleanup_db(&db);
763
764 {
765 let config =
767 Config::new().set_mapsize(1024usize * 1024usize * 1024usize * 1024usize * 32usize);
768
769 let ndb = Ndb::new(db, &config);
773
774 assert!(ndb.is_ok());
775 }
776
777 let file_len = fs::metadata(Path::new(db).join("data.mdb"))
778 .expect("metadata")
779 .len();
780
781 assert!(file_len > 0);
782
783 if cfg!(target_os = "windows") {
784 assert_ne!(file_len, 1048576);
787 } else {
788 assert!(file_len < 1024u64 * 1024u64);
789 }
790
791 test_util::cleanup_db(&db);
793 }
794
795 #[tokio::test]
796 async fn test_unsub_on_drop() {
797 let db = "target/testdbs/test_unsub_on_drop";
798 test_util::cleanup_db(&db);
799
800 {
801 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
802 let sub_id = {
803 let filter = Filter::new().kinds(vec![1]).build();
804 let filters = vec![filter];
805
806 let sub_id = ndb.subscribe(&filters).expect("sub_id");
807 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
808
809 let res = sub.next();
810
811 ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
812
813 let res = res.await.expect("await ok");
814 assert_eq!(res, vec![NoteKey::new(1)]);
815
816 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
817 sub_id
818 };
819
820 assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
822 assert_eq!(ndb.subscription_count(), 0);
823 }
824
825 test_util::cleanup_db(&db);
826 }
827
828 #[tokio::test]
829 async fn test_stream() {
830 let db = "target/testdbs/test_stream";
831 test_util::cleanup_db(&db);
832
833 {
834 let mut ndb = Ndb::new(db, &Config::new()).expect("ndb");
835 let sub_id = {
836 let filter = Filter::new().kinds(vec![1]).build();
837 let filters = vec![filter];
838
839 let sub_id = ndb.subscribe(&filters).expect("sub_id");
840 let mut sub = sub_id.stream(&ndb).notes_per_await(1);
841
842 let res = sub.next();
843
844 let _ = ndb.process_event(r#"["EVENT","b",{"id": "702555e52e82cc24ad517ba78c21879f6e47a7c0692b9b20df147916ae8731a3","pubkey": "32bf915904bfde2d136ba45dde32c88f4aca863783999faea2e847a8fafd2f15","created_at": 1702675561,"kind": 1,"tags": [],"content": "hello, world","sig": "2275c5f5417abfd644b7bc74f0388d70feb5d08b6f90fa18655dda5c95d013bfbc5258ea77c05b7e40e0ee51d8a2efa931dc7a0ec1db4c0a94519762c6625675"}]"#).expect("process ok");
845
846 let _ = ndb.process_event(r#"["EVENT",{"content":"👀","created_at":1761514455,"id":"66af95a6bdfec756344f48241562b684082ff9c76ea940c11c4fd85e91e1219c","kind":7,"pubkey":"d5805ae449e108e907091c67cdf49a9835b3cac3dd11489ad215c0ddf7c658fc","sig":"69f4a3fe7c1cc6aa9c9cc4a2e90e4b71c3b9afaad262e68b92336e0493ff1a748b5dcc20ab6e86d4551dc5ea680ddfa1c08d47f9e4845927e143e8ef2183479b","tags":[["e","d44ad96cb8924092a76bc2afddeb12eb85233c0d03a7d9adc42c2a85a79a4305","wss://relay.primal.net/","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"],["p","04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9","wss://relay.primal.net/"],["k","1"]]}]"#);
847 let res = res.await.expect("await ok");
848 assert_eq!(res, vec![NoteKey::new(1)]);
849
850 assert!(ndb.unsubscribe(sub_id).is_ok());
852 assert!(sub.next().await.is_none());
853
854 assert!(ndb.subs.lock().unwrap().contains_key(&sub_id));
855 sub_id
856 };
857
858 assert!(!ndb.subs.lock().unwrap().contains_key(&sub_id));
860 }
861
862 test_util::cleanup_db(&db);
863 }
864
865 #[test]
866 fn get_profile_by_missing_pubkey_returns_not_found() {
867 let db = "target/testdbs/missing_pubkey";
868 test_util::cleanup_db(&db);
869
870 {
871 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
872 let txn = Transaction::new(&ndb).expect("txn");
873
874 let unknown_pubkey: [u8; 32] = [
876 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
877 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
878 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
879 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
880 ];
881
882 let result = ndb.get_profile_by_pubkey(&txn, &unknown_pubkey);
883 assert!(matches!(result, Err(Error::NotFound)));
884 }
885
886 test_util::cleanup_db(&db);
887 }
888
889 #[test]
890 fn get_profilekey_by_missing_pubkey_returns_not_found() {
891 let db = "target/testdbs/missing_profilekey";
892 test_util::cleanup_db(&db);
893
894 {
895 let ndb = Ndb::new(db, &Config::new()).expect("ndb");
896 let txn = Transaction::new(&ndb).expect("txn");
897
898 let unknown_pubkey: [u8; 32] = [0x00; 32];
900
901 let result = ndb.get_profilekey_by_pubkey(&txn, &unknown_pubkey);
902 assert!(matches!(result, Err(Error::NotFound)));
903 }
904
905 test_util::cleanup_db(&db);
906 }
907}