1pub mod crawl;
4pub mod tree_event_snapshots;
5pub use crawl::{
6 CrawlConfig, CrawlError, CrawlReport, EventSelectionPolicy, KindPriorityPolicy, NostrBridge,
7 RelayFetchMode,
8};
9pub use tree_event_snapshots::{
10 compare_tree_event_snapshots, is_newer_tree_event_snapshot,
11 parse_tree_event_snapshot_permalink, read_tree_event_snapshot, resolve_snapshot_root_cid,
12 serialize_tree_event_snapshot_permalink, snapshot_matches_root_cid, store_tree_event_snapshot,
13 TreeEventSnapshotInfo, TreeEventSnapshotPermalink,
14};
15
16use std::collections::BTreeMap;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use std::cell::RefCell;
22
23use futures::{stream, StreamExt, TryStreamExt};
24use hashtree_collection::{
25 load_collection_manifest_metadata, load_collection_state, CollectionDefinition,
26 CollectionOptions, CollectionPublishedSchema, CollectionSource, CollectionState,
27 CollectionWriter,
28};
29use hashtree_core::{
30 sha256, BufferedStore, Cid, HashTree, HashTreeConfig, HashTreeError, Store, TreeVisibility,
31};
32use hashtree_index::{BTree, BTreeError, BTreeOptions};
33use nostr_sdk::nips::nip44::{self, Version as Nip44Version};
34use nostr_sdk::{
35 Alphabet, Event, EventBuilder, Keys, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
36};
37use serde::{Deserialize, Serialize};
38
39pub const NOSTR_EVENT_ENVELOPE_VERSION: u8 = 1;
40pub const HASHTREE_ROOT_KIND: u32 = 30078;
41pub const HASHTREE_LABEL: &str = "hashtree";
42pub const TAG_HASH: &str = "hash";
43pub const TAG_KEY: &str = "key";
44pub const TAG_ENCRYPTED_KEY: &str = "encryptedKey";
45pub const TAG_KEY_ID: &str = "keyId";
46pub const TAG_SELF_ENCRYPTED_KEY: &str = "selfEncryptedKey";
47pub const TAG_SELF_ENCRYPTED_LINK_KEY: &str = "selfEncryptedLinkKey";
48const MANIFEST_BY_AUTHOR_TIME: &str = "by-author-time";
49const MANIFEST_BY_AUTHOR_KIND_TIME: &str = "by-author-kind-time";
50const MANIFEST_BY_KIND_TIME: &str = "by-kind-time";
51const MANIFEST_BY_KIND_TIME_AUTHOR: &str = "by-kind-time-author";
52const MANIFEST_BY_TIME: &str = "by-time";
53const MANIFEST_BY_TAG: &str = "by-tag";
54const MANIFEST_REPLACEABLE: &str = "replaceable";
55const MANIFEST_PARAMETERIZED_REPLACEABLE: &str = "parameterized-replaceable";
56const EVENT_BLOB_WRITE_CONCURRENCY: usize = 64;
57const NOSTR_EVENT_ITEM_FORMAT: &str = "nostr/event@1";
58const NOSTR_EVENT_PROJECTION_FORMAT: &str = "hashtree/nostr-event-index@1";
59const MAX_SNAPSHOT_BYTES: usize = 256 * 1024;
60
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct StoredNostrEvent {
63 pub id: String,
64 pub pubkey: String,
65 pub created_at: u64,
66 pub kind: u32,
67 pub tags: Vec<Vec<String>>,
68 pub content: String,
69 pub sig: String,
70}
71
72#[derive(Debug, Clone)]
74pub struct VerifiedEvent(Event);
75
76impl VerifiedEvent {
77 pub fn as_event(&self) -> &Event {
78 &self.0
79 }
80
81 pub fn into_event(self) -> Event {
82 self.0
83 }
84
85 pub fn to_stored_event(&self) -> VerifiedStoredNostrEvent {
86 VerifiedStoredNostrEvent {
87 event: stored_event_from_nostr_sdk_event(&self.0),
88 }
89 }
90}
91
92impl TryFrom<Event> for VerifiedEvent {
93 type Error = NostrEventStoreError;
94
95 fn try_from(event: Event) -> Result<Self, Self::Error> {
96 verify_nostr_sdk_event(&event)?;
97 Ok(Self(event))
98 }
99}
100
101impl AsRef<Event> for VerifiedEvent {
102 fn as_ref(&self) -> &Event {
103 self.as_event()
104 }
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct VerifiedStoredNostrEvent {
110 event: StoredNostrEvent,
111}
112
113impl VerifiedStoredNostrEvent {
114 pub fn as_stored(&self) -> &StoredNostrEvent {
115 &self.event
116 }
117
118 pub fn into_stored(self) -> StoredNostrEvent {
119 self.event
120 }
121
122 pub fn to_nostr_sdk_event(&self) -> Result<VerifiedEvent, NostrEventStoreError> {
123 VerifiedEvent::try_from(nostr_sdk_event_from_stored_event(&self.event)?)
124 }
125}
126
127impl TryFrom<StoredNostrEvent> for VerifiedStoredNostrEvent {
128 type Error = NostrEventStoreError;
129
130 fn try_from(event: StoredNostrEvent) -> Result<Self, Self::Error> {
131 let event = normalize_signed_event(event)?;
132 let sdk_event = nostr_sdk_event_from_stored_event(&event)?;
133 verify_nostr_sdk_event(&sdk_event)?;
134 Ok(Self { event })
135 }
136}
137
138impl AsRef<StoredNostrEvent> for VerifiedStoredNostrEvent {
139 fn as_ref(&self) -> &StoredNostrEvent {
140 self.as_stored()
141 }
142}
143
144#[derive(Debug, Clone, PartialEq)]
145pub struct ParsedHashtreeRootEvent {
146 pub event: StoredNostrEvent,
147 pub tree_name: String,
148 pub root_cid: Cid,
149 pub visibility: TreeVisibility,
150 pub labels: Vec<String>,
151 pub encrypted_key: Option<String>,
152 pub key_id: Option<String>,
153 pub self_encrypted_key: Option<String>,
154 pub self_encrypted_link_key: Option<String>,
155}
156
157#[derive(Debug, Clone, Default, PartialEq)]
158pub struct NostrEventManifest {
159 pub by_id: Option<Cid>,
160 pub by_author_time: Option<Cid>,
161 pub by_author_kind_time: Option<Cid>,
162 pub by_kind_time: Option<Cid>,
163 pub by_kind_time_author: Option<Cid>,
164 pub by_time: Option<Cid>,
165 pub by_tag: Option<Cid>,
166 pub replaceable: Option<Cid>,
167 pub parameterized_replaceable: Option<Cid>,
168}
169
170#[derive(Debug, Clone, Default)]
171pub struct ListEventsOptions {
172 pub limit: Option<usize>,
173 pub since: Option<u64>,
174 pub until: Option<u64>,
175}
176
177#[derive(Debug, Clone, Default)]
178pub struct ProfileStat {
179 pub label: &'static str,
180 pub count: u64,
181 pub total: Duration,
182 pub max: Duration,
183}
184
185#[derive(Debug, Default)]
186struct ProfileAccumulator {
187 count: u64,
188 total: Duration,
189 max: Duration,
190}
191
192static PROFILE_ENABLED: AtomicBool = AtomicBool::new(false);
193
194thread_local! {
195 static PROFILE_STATE: RefCell<BTreeMap<&'static str, ProfileAccumulator>> =
196 const { RefCell::new(BTreeMap::new()) };
197}
198
199pub fn set_profile_enabled(enabled: bool) {
200 PROFILE_ENABLED.store(enabled, Ordering::Relaxed);
201}
202
203pub fn reset_profile() {
204 PROFILE_STATE.with(|state| state.borrow_mut().clear());
205}
206
207pub fn take_profile() -> Vec<ProfileStat> {
208 PROFILE_STATE.with(|state| {
209 state
210 .borrow()
211 .iter()
212 .map(|(&label, acc)| ProfileStat {
213 label,
214 count: acc.count,
215 total: acc.total,
216 max: acc.max,
217 })
218 .collect()
219 })
220}
221
222pub struct ProfileGuard {
223 label: &'static str,
224 started: Option<Instant>,
225}
226
227impl ProfileGuard {
228 pub fn new(label: &'static str) -> Self {
229 let started = PROFILE_ENABLED.load(Ordering::Relaxed).then(Instant::now);
230 Self { label, started }
231 }
232}
233
234impl Drop for ProfileGuard {
235 fn drop(&mut self) {
236 let Some(started) = self.started else {
237 return;
238 };
239 let elapsed = started.elapsed();
240 PROFILE_STATE.with(|state| {
241 let mut state = state.borrow_mut();
242 let entry = state.entry(self.label).or_default();
243 entry.count += 1;
244 entry.total += elapsed;
245 entry.max = entry.max.max(elapsed);
246 });
247 }
248}
249
250#[derive(Debug, thiserror::Error)]
251pub enum NostrEventStoreError {
252 #[error("hash tree error: {0}")]
253 HashTree(#[from] HashTreeError),
254 #[error("index error: {0}")]
255 Index(#[from] BTreeError),
256 #[error("collection error: {0}")]
257 Collection(#[from] hashtree_collection::CollectionError),
258 #[error("encode error: {0}")]
259 Encode(#[from] rmp_serde::encode::Error),
260 #[error("decode error: {0}")]
261 Decode(#[from] rmp_serde::decode::Error),
262 #[error("json error: {0}")]
263 Json(#[from] serde_json::Error),
264 #[error("{0}")]
265 Validation(String),
266}
267
268const MISSING_STORED_EVENT_BLOB_ERROR: &str = "stored nostr event blob is missing";
269
270fn is_missing_stored_event_error(err: &NostrEventStoreError) -> bool {
271 matches!(
272 err,
273 NostrEventStoreError::Validation(message) if message == MISSING_STORED_EVENT_BLOB_ERROR
274 )
275}
276
277pub struct NostrEventStore<S: Store> {
278 store: Arc<S>,
279 tree: HashTree<S>,
280 index: BTree<S>,
281 options: NostrEventStoreOptions,
282}
283
284#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
285enum ReplaceableSlot {
286 Replaceable,
287 Parameterized,
288}
289
290#[derive(Debug, Clone)]
291struct ExistingReplaceableEvent {
292 event: StoredNostrEvent,
293 cid: Cid,
294}
295
296struct PreparedEventBlob {
297 sequence: usize,
298 event: StoredNostrEvent,
299 bytes: Vec<u8>,
300 previous: Option<StoredNostrEvent>,
301}
302
303#[allow(clippy::large_enum_variant)]
304#[derive(Debug, Clone)]
305enum ReplaceableDecision {
306 Accept {
307 replaced: Option<ExistingReplaceableEvent>,
308 slot: Option<(ReplaceableSlot, String)>,
309 },
310 Reject,
311}
312
313pub fn encode_signed_event_json(event: &StoredNostrEvent) -> Result<Vec<u8>, NostrEventStoreError> {
314 let normalized = normalize_signed_event(event.clone())?;
315 Ok(serde_json::to_vec(&normalized)?)
316}
317
318pub fn decode_signed_event_json(data: &[u8]) -> Result<StoredNostrEvent, NostrEventStoreError> {
319 let decoded: StoredNostrEvent = serde_json::from_slice(data)?;
320 normalize_signed_event(decoded)
321}
322
323pub fn encode_stored_event_msgpack(
324 event: &StoredNostrEvent,
325) -> Result<Vec<u8>, NostrEventStoreError> {
326 let normalized = normalize_signed_event(event.clone())?;
327 Ok(rmp_serde::to_vec(&(
328 NOSTR_EVENT_ENVELOPE_VERSION,
329 normalized.id,
330 normalized.pubkey,
331 normalized.created_at,
332 normalized.kind,
333 normalized.tags,
334 normalized.content,
335 normalized.sig,
336 ))?)
337}
338
339pub fn decode_stored_event_msgpack(data: &[u8]) -> Result<StoredNostrEvent, NostrEventStoreError> {
340 let (version, id, pubkey, created_at, kind, tags, content, sig): (
341 u8,
342 String,
343 String,
344 u64,
345 u32,
346 Vec<Vec<String>>,
347 String,
348 String,
349 ) = rmp_serde::from_slice(data)?;
350
351 if version != NOSTR_EVENT_ENVELOPE_VERSION {
352 return Err(NostrEventStoreError::Validation(format!(
353 "unsupported event envelope version: {version}"
354 )));
355 }
356
357 normalize_signed_event(StoredNostrEvent {
358 id,
359 pubkey,
360 created_at,
361 kind,
362 tags,
363 content,
364 sig,
365 })
366}
367
368pub async fn store_signed_event_snapshot<S: Store>(
369 store: Arc<S>,
370 event: &StoredNostrEvent,
371) -> Result<Cid, NostrEventStoreError> {
372 let bytes = encode_signed_event_json(event)?;
373 let tree = HashTree::new(HashTreeConfig::new(store).public());
374 let (cid, _) = tree.put(&bytes).await?;
375 Ok(cid)
376}
377
378pub async fn read_signed_event_snapshot<S: Store>(
379 store: Arc<S>,
380 snapshot_cid: &Cid,
381 max_bytes: Option<usize>,
382) -> Result<StoredNostrEvent, NostrEventStoreError> {
383 let max_bytes = max_bytes.unwrap_or(MAX_SNAPSHOT_BYTES);
384 let tree = HashTree::new(HashTreeConfig::new(store).public());
385 let data = tree
386 .get(snapshot_cid, Some((max_bytes + 1) as u64))
387 .await?
388 .ok_or_else(|| {
389 NostrEventStoreError::Validation("signed Nostr event snapshot is missing".to_string())
390 })?;
391 if data.len() > max_bytes {
392 return Err(NostrEventStoreError::Validation(format!(
393 "signed Nostr event snapshot exceeds {max_bytes} bytes"
394 )));
395 }
396 decode_signed_event_json(&data)
397}
398
399pub fn stored_event_from_nostr_sdk_event(event: &Event) -> StoredNostrEvent {
400 StoredNostrEvent {
401 id: event.id.to_hex(),
402 pubkey: event.pubkey.to_hex(),
403 created_at: event.created_at.as_secs(),
404 kind: u32::from(event.kind.as_u16()),
405 tags: event
406 .tags
407 .iter()
408 .map(|tag| tag.as_slice().to_vec())
409 .collect(),
410 content: event.content.clone(),
411 sig: event.sig.to_string(),
412 }
413}
414
415fn nostr_sdk_event_from_stored_event(
416 event: &StoredNostrEvent,
417) -> Result<Event, NostrEventStoreError> {
418 let bytes = serde_json::to_vec(event)?;
419 Ok(serde_json::from_slice(&bytes)?)
420}
421
422fn verify_nostr_sdk_event(event: &Event) -> Result<(), NostrEventStoreError> {
423 event.verify().map_err(|err| {
424 NostrEventStoreError::Validation(format!("signature verification failed: {err}"))
425 })
426}
427
428pub fn parse_hashtree_root_event(
429 event: &StoredNostrEvent,
430) -> Result<Option<ParsedHashtreeRootEvent>, NostrEventStoreError> {
431 let normalized = normalize_signed_event(event.clone())?;
432 if normalized.kind != HASHTREE_ROOT_KIND {
433 return Ok(None);
434 }
435
436 let tree_name = normalized
437 .tags
438 .iter()
439 .find(|tag| tag.first().map(String::as_str) == Some("d"))
440 .and_then(|tag| tag.get(1))
441 .cloned();
442 let hash_hex = normalized
443 .tags
444 .iter()
445 .find(|tag| tag.first().map(String::as_str) == Some(TAG_HASH))
446 .and_then(|tag| tag.get(1))
447 .cloned();
448 let (Some(tree_name), Some(hash_hex)) = (tree_name, hash_hex) else {
449 return Ok(None);
450 };
451
452 if has_any_label(&normalized) && !has_label(&normalized, HASHTREE_LABEL) {
453 return Ok(None);
454 }
455
456 let labels = unique_labels(
457 normalized
458 .tags
459 .iter()
460 .filter(|tag| tag.first().map(String::as_str) == Some("l"))
461 .filter_map(|tag| tag.get(1).cloned())
462 .collect(),
463 );
464 let key_hex = normalized
465 .tags
466 .iter()
467 .find(|tag| tag.first().map(String::as_str) == Some(TAG_KEY))
468 .and_then(|tag| tag.get(1))
469 .cloned();
470 let encrypted_key = normalized
471 .tags
472 .iter()
473 .find(|tag| tag.first().map(String::as_str) == Some(TAG_ENCRYPTED_KEY))
474 .and_then(|tag| tag.get(1))
475 .cloned();
476 let key_id = normalized
477 .tags
478 .iter()
479 .find(|tag| tag.first().map(String::as_str) == Some(TAG_KEY_ID))
480 .and_then(|tag| tag.get(1))
481 .cloned();
482 let self_encrypted_key = normalized
483 .tags
484 .iter()
485 .find(|tag| tag.first().map(String::as_str) == Some(TAG_SELF_ENCRYPTED_KEY))
486 .and_then(|tag| tag.get(1))
487 .cloned();
488 let self_encrypted_link_key = normalized
489 .tags
490 .iter()
491 .find(|tag| tag.first().map(String::as_str) == Some(TAG_SELF_ENCRYPTED_LINK_KEY))
492 .and_then(|tag| tag.get(1))
493 .cloned();
494
495 let visibility = if encrypted_key.is_some() {
496 TreeVisibility::LinkVisible
497 } else if self_encrypted_key.is_some() {
498 TreeVisibility::Private
499 } else {
500 TreeVisibility::Public
501 };
502
503 let hash = hashtree_core::from_hex(&hash_hex).map_err(|_| {
504 NostrEventStoreError::Validation(
505 "root hash must be a lowercase 64-character hex string".to_string(),
506 )
507 })?;
508 let key = match (visibility, key_hex) {
509 (TreeVisibility::Public, Some(key_hex)) => {
510 Some(hashtree_core::from_hex(&key_hex).map_err(|_| {
511 NostrEventStoreError::Validation(
512 "root key must be a lowercase 64-character hex string".to_string(),
513 )
514 })?)
515 }
516 _ => None,
517 };
518
519 Ok(Some(ParsedHashtreeRootEvent {
520 event: normalized,
521 tree_name,
522 root_cid: Cid { hash, key },
523 visibility,
524 labels,
525 encrypted_key,
526 key_id,
527 self_encrypted_key,
528 self_encrypted_link_key,
529 }))
530}
531
532pub fn parse_verified_hashtree_root_event(
533 event: &Event,
534) -> Result<Option<ParsedHashtreeRootEvent>, NostrEventStoreError> {
535 let verified = VerifiedEvent::try_from(event.clone())?;
536 parse_hashtree_root_event(verified.to_stored_event().as_stored())
537}
538
539pub fn resolve_self_encrypted_root_cid(
540 parsed: &ParsedHashtreeRootEvent,
541 owner_keys: &Keys,
542) -> Result<Cid, NostrEventStoreError> {
543 if parsed.root_cid.key.is_some() {
544 return Ok(parsed.root_cid.clone());
545 }
546
547 let ciphertext = parsed.self_encrypted_key.as_ref().ok_or_else(|| {
548 NostrEventStoreError::Validation("hashtree root key is unavailable".to_string())
549 })?;
550 let author = PublicKey::from_hex(&parsed.event.pubkey).map_err(|err| {
551 NostrEventStoreError::Validation(format!("invalid root event pubkey: {err}"))
552 })?;
553 if owner_keys.public_key() != author {
554 return Err(NostrEventStoreError::Validation(format!(
555 "owner key {} does not match root event author {}",
556 owner_keys.public_key().to_hex(),
557 parsed.event.pubkey
558 )));
559 }
560 let key_hex = nip44::decrypt(owner_keys.secret_key(), &author, ciphertext).map_err(|_| {
561 NostrEventStoreError::Validation("hashtree root key is unavailable".to_string())
562 })?;
563 let key = hashtree_core::from_hex(&key_hex).map_err(|_| {
564 NostrEventStoreError::Validation(
565 "root key must be a lowercase 64-character hex string".to_string(),
566 )
567 })?;
568 Ok(Cid {
569 hash: parsed.root_cid.hash,
570 key: Some(key),
571 })
572}
573
574pub fn build_private_hashtree_root_event(
575 owner_keys: &Keys,
576 tree_name: &str,
577 root_cid: &Cid,
578 created_at: Option<u64>,
579) -> Result<Event, NostrEventStoreError> {
580 let Some(root_key) = root_cid.key else {
581 return Err(NostrEventStoreError::Validation(
582 "private hashtree root requires an encrypted CID".to_string(),
583 ));
584 };
585 let root_key_hex = hex::encode(root_key);
586 let self_encrypted_key = nip44::encrypt(
587 owner_keys.secret_key(),
588 &owner_keys.public_key(),
589 root_key_hex,
590 Nip44Version::V2,
591 )
592 .map_err(|err| {
593 NostrEventStoreError::Validation(format!("self-encrypted root key failed: {err}"))
594 })?;
595 let builder = EventBuilder::new(Kind::from(HASHTREE_ROOT_KIND as u16), "").tags([
596 Tag::identifier(tree_name.to_string()),
597 Tag::custom(
598 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
599 vec![HASHTREE_LABEL],
600 ),
601 Tag::custom(
602 TagKind::Custom(TAG_HASH.into()),
603 vec![hex::encode(root_cid.hash)],
604 ),
605 Tag::custom(
606 TagKind::Custom(TAG_SELF_ENCRYPTED_KEY.into()),
607 vec![self_encrypted_key],
608 ),
609 ]);
610 let builder = if let Some(created_at) = created_at {
611 builder.custom_created_at(nostr_sdk::Timestamp::from(created_at))
612 } else {
613 builder
614 };
615 builder.sign_with_keys(owner_keys).map_err(|err| {
616 NostrEventStoreError::Validation(format!("build hashtree root event failed: {err}"))
617 })
618}
619
620#[derive(Debug, Clone, Default)]
621pub struct NostrEventStoreOptions {
622 pub btree_order: Option<usize>,
623}
624
625fn nostr_collection_definition() -> CollectionDefinition<StoredNostrEvent> {
626 CollectionDefinition::new(|event: &StoredNostrEvent| event.id.clone())
627 .with_published_schema(
628 CollectionPublishedSchema::new()
629 .with_item_format(NOSTR_EVENT_ITEM_FORMAT)
630 .with_projection_format(NOSTR_EVENT_PROJECTION_FORMAT),
631 )
632 .with_key_index(MANIFEST_BY_AUTHOR_TIME, |event| {
633 vec![author_time_key(event)]
634 })
635 .with_key_index(MANIFEST_BY_AUTHOR_KIND_TIME, |event| {
636 vec![author_kind_time_key(event)]
637 })
638 .with_key_index(MANIFEST_BY_KIND_TIME, |event| vec![kind_time_key(event)])
639 .with_key_index(MANIFEST_BY_KIND_TIME_AUTHOR, |event| {
640 vec![kind_time_author_key(event)]
641 })
642 .with_key_index(MANIFEST_BY_TIME, |event| vec![time_key(event)])
643 .with_key_index(MANIFEST_BY_TAG, tag_keys)
644 .with_key_index(MANIFEST_REPLACEABLE, |event| {
645 if is_replaceable_kind(event.kind) {
646 vec![replaceable_key(&event.pubkey, event.kind)]
647 } else {
648 Vec::new()
649 }
650 })
651 .with_key_index(MANIFEST_PARAMETERIZED_REPLACEABLE, |event| {
652 if is_parameterized_replaceable_kind(event.kind) {
653 vec![parameterized_replaceable_key(
654 &event.pubkey,
655 event.kind,
656 ¶meterized_replaceable_d_tag(event),
657 )]
658 } else {
659 Vec::new()
660 }
661 })
662}
663
664fn nostr_collection_options(options: &NostrEventStoreOptions) -> CollectionOptions {
665 CollectionOptions {
666 btree_order: options.btree_order,
667 }
668}
669
670fn collection_state_from_nostr_manifest(manifest: &NostrEventManifest) -> CollectionState {
671 let mut key_roots = BTreeMap::new();
672 key_roots.insert(
673 MANIFEST_BY_AUTHOR_TIME.to_string(),
674 manifest.by_author_time.clone(),
675 );
676 key_roots.insert(
677 MANIFEST_BY_AUTHOR_KIND_TIME.to_string(),
678 manifest.by_author_kind_time.clone(),
679 );
680 key_roots.insert(
681 MANIFEST_BY_KIND_TIME.to_string(),
682 manifest.by_kind_time.clone(),
683 );
684 key_roots.insert(
685 MANIFEST_BY_KIND_TIME_AUTHOR.to_string(),
686 manifest.by_kind_time_author.clone(),
687 );
688 key_roots.insert(MANIFEST_BY_TIME.to_string(), manifest.by_time.clone());
689 key_roots.insert(MANIFEST_BY_TAG.to_string(), manifest.by_tag.clone());
690 key_roots.insert(
691 MANIFEST_REPLACEABLE.to_string(),
692 manifest.replaceable.clone(),
693 );
694 key_roots.insert(
695 MANIFEST_PARAMETERIZED_REPLACEABLE.to_string(),
696 manifest.parameterized_replaceable.clone(),
697 );
698 CollectionState {
699 by_id_root: manifest.by_id.clone(),
700 key_roots,
701 search_roots: BTreeMap::new(),
702 }
703}
704
705fn nostr_manifest_from_collection_state(state: &CollectionState) -> NostrEventManifest {
706 NostrEventManifest {
707 by_id: state.by_id_root.clone(),
708 by_author_time: state.key_root(MANIFEST_BY_AUTHOR_TIME).cloned(),
709 by_author_kind_time: state.key_root(MANIFEST_BY_AUTHOR_KIND_TIME).cloned(),
710 by_kind_time: state.key_root(MANIFEST_BY_KIND_TIME).cloned(),
711 by_kind_time_author: state.key_root(MANIFEST_BY_KIND_TIME_AUTHOR).cloned(),
712 by_time: state.key_root(MANIFEST_BY_TIME).cloned(),
713 by_tag: state.key_root(MANIFEST_BY_TAG).cloned(),
714 replaceable: state.key_root(MANIFEST_REPLACEABLE).cloned(),
715 parameterized_replaceable: state.key_root(MANIFEST_PARAMETERIZED_REPLACEABLE).cloned(),
716 }
717}
718
719impl<S: Store> NostrEventStore<S> {
720 pub fn new(store: Arc<S>) -> Self {
721 Self::with_options(store, NostrEventStoreOptions::default())
722 }
723
724 pub fn with_options(store: Arc<S>, options: NostrEventStoreOptions) -> Self {
725 Self {
726 store: Arc::clone(&store),
727 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
728 index: BTree::new(
729 store,
730 BTreeOptions {
731 order: options.btree_order,
732 },
733 ),
734 options,
735 }
736 }
737
738 pub fn encode_event(&self, event: &StoredNostrEvent) -> Result<Vec<u8>, NostrEventStoreError> {
739 encode_stored_event_msgpack(&self.validate_event_shape(event.clone())?)
740 }
741
742 pub fn decode_event(&self, data: &[u8]) -> Result<StoredNostrEvent, NostrEventStoreError> {
743 self.validate_event_shape(decode_stored_event_msgpack(data)?)
744 }
745
746 pub fn decode_verified_event(
747 &self,
748 data: &[u8],
749 ) -> Result<VerifiedStoredNostrEvent, NostrEventStoreError> {
750 VerifiedStoredNostrEvent::try_from(self.decode_event(data)?)
751 }
752
753 pub async fn validate_index_root(
754 &self,
755 root: Option<&Cid>,
756 ) -> Result<(), NostrEventStoreError> {
757 let Some(root) = root else {
758 return Ok(());
759 };
760
761 let manifest = self.get_manifest(Some(root)).await?;
762 let mut missing = Vec::new();
763 if manifest.by_id.is_none() {
764 missing.push("by-id");
765 }
766 if manifest.by_author_time.is_none() {
767 missing.push(MANIFEST_BY_AUTHOR_TIME);
768 }
769 if manifest.by_author_kind_time.is_none() {
770 missing.push(MANIFEST_BY_AUTHOR_KIND_TIME);
771 }
772 if manifest.by_kind_time.is_none() {
773 missing.push(MANIFEST_BY_KIND_TIME);
774 }
775 if manifest.by_time.is_none() {
776 missing.push(MANIFEST_BY_TIME);
777 }
778 if !missing.is_empty() {
779 return Err(NostrEventStoreError::Validation(format!(
780 "nostr event index root missing required manifest entries: {}",
781 missing.join(", ")
782 )));
783 }
784
785 if let Some(metadata) =
786 load_collection_manifest_metadata(Arc::clone(&self.store), Some(root)).await?
787 {
788 let schema = metadata.published_schema();
789 if let Some(item_format) = schema.and_then(|schema| schema.item_format()) {
790 if item_format != NOSTR_EVENT_ITEM_FORMAT {
791 return Err(NostrEventStoreError::Validation(format!(
792 "nostr event index item format mismatch: expected {NOSTR_EVENT_ITEM_FORMAT}, got {item_format}"
793 )));
794 }
795 }
796 if let Some(projection_format) = schema.and_then(|schema| schema.projection_format()) {
797 if projection_format != NOSTR_EVENT_PROJECTION_FORMAT {
798 return Err(NostrEventStoreError::Validation(format!(
799 "nostr event index projection format mismatch: expected {NOSTR_EVENT_PROJECTION_FORMAT}, got {projection_format}"
800 )));
801 }
802 }
803 }
804
805 Ok(())
806 }
807
808 pub async fn add(
809 &self,
810 root: Option<&Cid>,
811 event: StoredNostrEvent,
812 ) -> Result<Cid, NostrEventStoreError> {
813 let buffered_store = Arc::new(BufferedStore::new_optimistic(Arc::clone(&self.store)));
814 let buffered_writer =
815 NostrEventStore::with_options(Arc::clone(&buffered_store), self.options.clone());
816 let normalized = {
817 let _profile = ProfileGuard::new("nostr.add.validate_event");
818 buffered_writer.validate_event(event).await?
819 };
820 let mut manifest = {
821 let _profile = ProfileGuard::new("nostr.add.get_manifest");
822 buffered_writer.get_manifest(root).await?
823 };
824 let mut obsolete_event_cids = Vec::new();
825 buffered_writer
826 .insert_into_manifest(&mut manifest, normalized, &mut obsolete_event_cids)
827 .await?;
828
829 let Some(root) = ({
830 let _profile = ProfileGuard::new("nostr.add.write_manifest");
831 buffered_writer.write_manifest(&manifest).await?
832 }) else {
833 return Err(NostrEventStoreError::Validation(
834 "failed to write event manifest".to_string(),
835 ));
836 };
837 {
838 let _profile = ProfileGuard::new("nostr.add.flush");
839 buffered_store.flush().await.map_err(|err| {
840 NostrEventStoreError::HashTree(HashTreeError::Store(err.to_string()))
841 })?;
842 }
843 self.delete_obsolete_event_blobs(&obsolete_event_cids)
844 .await?;
845 Ok(root)
846 }
847
848 pub async fn build<I>(
849 &self,
850 root: Option<&Cid>,
851 events: I,
852 ) -> Result<Option<Cid>, NostrEventStoreError>
853 where
854 I: IntoIterator<Item = StoredNostrEvent>,
855 {
856 let mut events: Vec<StoredNostrEvent> = events.into_iter().collect();
857 if events.is_empty() {
858 return Ok(root.cloned());
859 }
860 events = retain_latest_replaceable_events(events);
861 events.sort_by(|left, right| match compare_events(left, right) {
862 x if x < 0 => std::cmp::Ordering::Less,
863 x if x > 0 => std::cmp::Ordering::Greater,
864 _ => std::cmp::Ordering::Equal,
865 });
866
867 let buffered_store = Arc::new(BufferedStore::new_optimistic(Arc::clone(&self.store)));
868 let buffered_writer =
869 NostrEventStore::with_options(Arc::clone(&buffered_store), self.options.clone());
870 let mut obsolete_event_cids = Vec::new();
871 let next_root = if root.is_none() {
872 buffered_writer.build_manifest_from_events(events).await?
873 } else {
874 let mut manifest = buffered_writer.get_manifest(root).await?;
875 let mut prepared_non_replaceable = Vec::new();
876 for (sequence, event) in events.into_iter().enumerate() {
877 let normalized = buffered_writer.validate_event(event).await?;
878 if is_replaceable_kind(normalized.kind)
879 || is_parameterized_replaceable_kind(normalized.kind)
880 {
881 buffered_writer
882 .insert_into_manifest(&mut manifest, normalized, &mut obsolete_event_cids)
883 .await?;
884 } else {
885 let previous = match buffered_writer
886 .manifest_event_cid(&manifest, &normalized.id)
887 .await?
888 {
889 Some(existing_cid) => {
890 match buffered_writer.read_stored_event(&existing_cid).await {
891 Ok(_) => continue,
892 Err(err) if is_missing_stored_event_error(&err) => {
893 Some(normalized.clone())
894 }
895 Err(err) => return Err(err),
896 }
897 }
898 None => None,
899 };
900 prepared_non_replaceable
901 .push(buffered_writer.prepare_event_blob(sequence, normalized, previous)?);
902 }
903 }
904
905 let indexed_events = buffered_writer
906 .put_prepared_event_blobs_parallel(prepared_non_replaceable)
907 .await?;
908 if !indexed_events.is_empty() {
909 let mut collection = buffered_writer.collection_writer_from_manifest(&manifest);
910 for (_sequence, event, event_cid, previous) in indexed_events {
911 collection
912 .put(&event, &event_cid, previous.as_ref())
913 .await?;
914 }
915 manifest = nostr_manifest_from_collection_state(collection.state());
916 }
917 buffered_writer.write_manifest(&manifest).await?
918 };
919 buffered_store
920 .flush()
921 .await
922 .map_err(|err| NostrEventStoreError::HashTree(HashTreeError::Store(err.to_string())))?;
923 self.delete_obsolete_event_blobs(&obsolete_event_cids)
924 .await?;
925 Ok(next_root)
926 }
927
928 pub async fn upgrade_manifest_indexes(
929 &self,
930 root: Option<&Cid>,
931 ) -> Result<Option<Cid>, NostrEventStoreError> {
932 let Some(root) = root else {
933 return Ok(None);
934 };
935
936 let manifest = self.get_manifest(Some(root)).await?;
937 let buffered_store = Arc::new(BufferedStore::new_optimistic(Arc::clone(&self.store)));
938 let buffered_writer =
939 NostrEventStore::with_options(Arc::clone(&buffered_store), self.options.clone());
940
941 let Some(next_manifest) = buffered_writer
942 .upgrade_manifest_with_missing_indexes(&manifest)
943 .await?
944 else {
945 return Ok(Some(root.clone()));
946 };
947
948 let next_root = buffered_writer.write_manifest(&next_manifest).await?;
949 buffered_store
950 .flush()
951 .await
952 .map_err(|err| NostrEventStoreError::HashTree(HashTreeError::Store(err.to_string())))?;
953 Ok(next_root)
954 }
955
956 pub async fn get_by_id(
957 &self,
958 root: Option<&Cid>,
959 event_id: &str,
960 ) -> Result<Option<StoredNostrEvent>, NostrEventStoreError> {
961 validate_lower_hex(event_id, 64, "event id")?;
962 let manifest = self.get_manifest(root).await?;
963 let source = self.collection_source_from_manifest(&manifest);
964 let Some(event_cid) = source.get(event_id).await? else {
965 return Ok(None);
966 };
967 match self.read_stored_event(&event_cid).await {
968 Ok(event) => Ok(Some(event)),
969 Err(err) if is_missing_stored_event_error(&err) => Ok(None),
970 Err(err) => Err(err),
971 }
972 }
973
974 pub async fn get_verified_by_id(
975 &self,
976 root: Option<&Cid>,
977 event_id: &str,
978 ) -> Result<Option<VerifiedStoredNostrEvent>, NostrEventStoreError> {
979 self.get_by_id(root, event_id)
980 .await?
981 .map(VerifiedStoredNostrEvent::try_from)
982 .transpose()
983 }
984
985 pub async fn list_by_author(
986 &self,
987 root: Option<&Cid>,
988 pubkey: &str,
989 options: ListEventsOptions,
990 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
991 let manifest = self.get_manifest(root).await?;
992 let Some(by_author_time) = manifest.by_author_time.as_ref() else {
993 return Ok(Vec::new());
994 };
995 self.collect_events(
996 by_author_time,
997 &format!("{}:", validate_lower_hex(pubkey, 64, "pubkey")?),
998 &options,
999 )
1000 .await
1001 }
1002
1003 pub async fn list_by_author_lossy(
1004 &self,
1005 root: Option<&Cid>,
1006 pubkey: &str,
1007 options: ListEventsOptions,
1008 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1009 let manifest = self.get_manifest(root).await?;
1010 let Some(by_author_time) = manifest.by_author_time.as_ref() else {
1011 return Ok(Vec::new());
1012 };
1013 self.collect_events_lossy(
1014 by_author_time,
1015 &format!("{}:", validate_lower_hex(pubkey, 64, "pubkey")?),
1016 &options,
1017 )
1018 .await
1019 }
1020
1021 pub async fn list_by_author_and_kind(
1022 &self,
1023 root: Option<&Cid>,
1024 pubkey: &str,
1025 kind: u32,
1026 options: ListEventsOptions,
1027 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1028 let manifest = self.get_manifest(root).await?;
1029 let Some(by_author_kind_time) = manifest.by_author_kind_time.as_ref() else {
1030 return Ok(Vec::new());
1031 };
1032
1033 self.collect_events(
1034 by_author_kind_time,
1035 &format!(
1036 "{}:{}:",
1037 validate_lower_hex(pubkey, 64, "pubkey")?,
1038 pad_kind(kind)
1039 ),
1040 &options,
1041 )
1042 .await
1043 }
1044
1045 pub async fn list_by_kind(
1046 &self,
1047 root: Option<&Cid>,
1048 kind: u32,
1049 options: ListEventsOptions,
1050 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1051 let manifest = self.get_manifest(root).await?;
1052 let Some(by_kind_time) = manifest.by_kind_time.as_ref() else {
1053 return Ok(Vec::new());
1054 };
1055
1056 self.collect_events(by_kind_time, &format!("{}:", pad_kind(kind)), &options)
1057 .await
1058 }
1059
1060 pub async fn list_by_kind_lossy(
1061 &self,
1062 root: Option<&Cid>,
1063 kind: u32,
1064 options: ListEventsOptions,
1065 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1066 let manifest = self.get_manifest(root).await?;
1067 let Some(by_kind_time) = manifest.by_kind_time.as_ref() else {
1068 return Ok(Vec::new());
1069 };
1070
1071 self.collect_events_lossy(by_kind_time, &format!("{}:", pad_kind(kind)), &options)
1072 .await
1073 }
1074
1075 pub async fn get_replaceable(
1076 &self,
1077 root: Option<&Cid>,
1078 pubkey: &str,
1079 kind: u32,
1080 ) -> Result<Option<StoredNostrEvent>, NostrEventStoreError> {
1081 let manifest = self.get_manifest(root).await?;
1082 let key = replaceable_key(&validate_lower_hex(pubkey, 64, "pubkey")?, kind);
1083 let source = self.collection_source_from_manifest(&manifest);
1084 let Some(cid) = source.get_index_link(MANIFEST_REPLACEABLE, &key).await? else {
1085 return Ok(None);
1086 };
1087 match self.read_stored_event(&cid).await {
1088 Ok(event) => Ok(Some(event)),
1089 Err(err) if is_missing_stored_event_error(&err) => Ok(None),
1090 Err(err) => Err(err),
1091 }
1092 }
1093
1094 pub async fn list_recent(
1095 &self,
1096 root: Option<&Cid>,
1097 options: ListEventsOptions,
1098 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1099 let manifest = self.get_manifest(root).await?;
1100 let Some(by_time) = manifest.by_time.as_ref() else {
1101 return Ok(Vec::new());
1102 };
1103 self.collect_events(by_time, "", &options).await
1104 }
1105
1106 pub async fn list_recent_lossy(
1107 &self,
1108 root: Option<&Cid>,
1109 options: ListEventsOptions,
1110 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1111 let manifest = self.get_manifest(root).await?;
1112 let Some(by_time) = manifest.by_time.as_ref() else {
1113 return Ok(Vec::new());
1114 };
1115 self.collect_events_lossy(by_time, "", &options).await
1116 }
1117
1118 pub async fn list_by_tag(
1119 &self,
1120 root: Option<&Cid>,
1121 tag_name: &str,
1122 tag_value: &str,
1123 options: ListEventsOptions,
1124 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1125 let manifest = self.get_manifest(root).await?;
1126 let Some(by_tag) = manifest.by_tag.as_ref() else {
1127 return Ok(Vec::new());
1128 };
1129 let prefix = tag_prefix(tag_name, tag_value)?;
1130 self.collect_events(by_tag, &prefix, &options).await
1131 }
1132
1133 pub async fn get_parameterized_replaceable(
1134 &self,
1135 root: Option<&Cid>,
1136 pubkey: &str,
1137 kind: u32,
1138 d_tag: &str,
1139 ) -> Result<Option<StoredNostrEvent>, NostrEventStoreError> {
1140 let manifest = self.get_manifest(root).await?;
1141 let key =
1142 parameterized_replaceable_key(&validate_lower_hex(pubkey, 64, "pubkey")?, kind, d_tag);
1143 let source = self.collection_source_from_manifest(&manifest);
1144 let Some(cid) = source
1145 .get_index_link(MANIFEST_PARAMETERIZED_REPLACEABLE, &key)
1146 .await?
1147 else {
1148 return Ok(None);
1149 };
1150 match self.read_stored_event(&cid).await {
1151 Ok(event) => Ok(Some(event)),
1152 Err(err) if is_missing_stored_event_error(&err) => Ok(None),
1153 Err(err) => Err(err),
1154 }
1155 }
1156
1157 pub async fn get_manifest(
1158 &self,
1159 root: Option<&Cid>,
1160 ) -> Result<NostrEventManifest, NostrEventStoreError> {
1161 let definition = nostr_collection_definition();
1162 let state = load_collection_state(Arc::clone(&self.store), &definition, root).await?;
1163 Ok(nostr_manifest_from_collection_state(&state))
1164 }
1165
1166 async fn collect_events(
1167 &self,
1168 root: &Cid,
1169 prefix: &str,
1170 options: &ListEventsOptions,
1171 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1172 let mut events = Vec::new();
1173 let entries = if prefix.is_empty() {
1174 match options.limit {
1175 Some(limit) => self.index.links_entries_limited(Some(root), limit).await?,
1176 None => self.index.links_entries(Some(root)).await?,
1177 }
1178 } else {
1179 match options.limit {
1180 Some(limit) => self.index.prefix_links_limited(root, prefix, limit).await?,
1181 None => self.index.prefix_links(root, prefix).await?,
1182 }
1183 };
1184 for (key, cid) in entries {
1185 let created_at = created_at_from_index_key(&key)?;
1186 if options.until.is_some_and(|until| created_at > until) {
1187 continue;
1188 }
1189 if options.since.is_some_and(|since| created_at < since) {
1190 break;
1191 }
1192 events.push(self.read_stored_event(&cid).await?);
1193 if options.limit.is_some_and(|limit| events.len() >= limit) {
1194 break;
1195 }
1196 }
1197 Ok(events)
1198 }
1199
1200 async fn collect_events_lossy(
1201 &self,
1202 root: &Cid,
1203 prefix: &str,
1204 options: &ListEventsOptions,
1205 ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1206 let mut events = Vec::new();
1207 let entries = if prefix.is_empty() {
1208 match options.limit {
1209 Some(limit) => self.index.links_entries_limited(Some(root), limit).await?,
1210 None => self.index.links_entries(Some(root)).await?,
1211 }
1212 } else {
1213 match options.limit {
1214 Some(limit) => self.index.prefix_links_limited(root, prefix, limit).await?,
1215 None => self.index.prefix_links(root, prefix).await?,
1216 }
1217 };
1218 for (key, cid) in entries {
1219 let created_at = created_at_from_index_key(&key)?;
1220 if options.until.is_some_and(|until| created_at > until) {
1221 continue;
1222 }
1223 if options.since.is_some_and(|since| created_at < since) {
1224 break;
1225 }
1226 match self.read_stored_event(&cid).await {
1227 Ok(event) => events.push(event),
1228 Err(NostrEventStoreError::Validation(message))
1229 if message == MISSING_STORED_EVENT_BLOB_ERROR => {}
1230 Err(err) => return Err(err),
1231 }
1232 if options.limit.is_some_and(|limit| events.len() >= limit) {
1233 break;
1234 }
1235 }
1236 Ok(events)
1237 }
1238
1239 async fn read_stored_event(&self, cid: &Cid) -> Result<StoredNostrEvent, NostrEventStoreError> {
1240 let Some(data) = self.tree.get(cid, None).await? else {
1241 return Err(NostrEventStoreError::Validation(
1242 MISSING_STORED_EVENT_BLOB_ERROR.to_string(),
1243 ));
1244 };
1245 self.decode_event(&data)
1246 }
1247
1248 async fn manifest_event_cid(
1249 &self,
1250 manifest: &NostrEventManifest,
1251 id: &str,
1252 ) -> Result<Option<Cid>, NostrEventStoreError> {
1253 self.collection_source_from_manifest(manifest)
1254 .get(id)
1255 .await
1256 .map_err(Into::into)
1257 }
1258
1259 fn prepare_event_blob(
1260 &self,
1261 sequence: usize,
1262 event: StoredNostrEvent,
1263 previous: Option<StoredNostrEvent>,
1264 ) -> Result<PreparedEventBlob, NostrEventStoreError> {
1265 let bytes = self.encode_validated_event(&event)?;
1266 Ok(PreparedEventBlob {
1267 sequence,
1268 event,
1269 bytes,
1270 previous,
1271 })
1272 }
1273
1274 async fn put_prepared_event_blobs_parallel(
1275 &self,
1276 prepared_events: Vec<PreparedEventBlob>,
1277 ) -> Result<Vec<(usize, StoredNostrEvent, Cid, Option<StoredNostrEvent>)>, NostrEventStoreError>
1278 {
1279 let tree = &self.tree;
1280 let mut indexed_events =
1281 stream::iter(prepared_events.into_iter().map(|prepared| async move {
1282 let (event_cid, _size) = tree.put_file(&prepared.bytes).await?;
1283 Ok::<_, NostrEventStoreError>((
1284 prepared.sequence,
1285 prepared.event,
1286 event_cid,
1287 prepared.previous,
1288 ))
1289 }))
1290 .buffer_unordered(EVENT_BLOB_WRITE_CONCURRENCY)
1291 .try_collect::<Vec<_>>()
1292 .await?;
1293
1294 indexed_events.sort_by_key(|(sequence, _, _, _)| *sequence);
1295 Ok(indexed_events)
1296 }
1297
1298 async fn insert_into_manifest(
1299 &self,
1300 manifest: &mut NostrEventManifest,
1301 normalized: StoredNostrEvent,
1302 obsolete_event_cids: &mut Vec<Cid>,
1303 ) -> Result<(), NostrEventStoreError> {
1304 let replaceable = self
1305 .resolve_replaceable_decision(manifest, &normalized)
1306 .await?;
1307 let (_replaceable_slot, replaced_existing) = match replaceable {
1308 ReplaceableDecision::Reject => return Ok(()),
1309 ReplaceableDecision::Accept { replaced, slot } => (slot, replaced),
1310 };
1311
1312 let event_bytes = {
1313 let _profile = ProfileGuard::new("nostr.add.encode_event");
1314 self.encode_validated_event(&normalized)?
1315 };
1316 let (event_cid, _size) = {
1317 let _profile = ProfileGuard::new("nostr.add.put_file");
1318 self.tree.put_file(&event_bytes).await?
1319 };
1320
1321 let mut collection = self.collection_writer_from_manifest(manifest);
1322 {
1323 let _profile = ProfileGuard::new("nostr.add.index.collection");
1324 collection
1325 .put(
1326 &normalized,
1327 &event_cid,
1328 replaced_existing.as_ref().map(|existing| &existing.event),
1329 )
1330 .await?;
1331 }
1332 *manifest = nostr_manifest_from_collection_state(collection.state());
1333
1334 if let Some(existing) = replaced_existing.as_ref() {
1335 obsolete_event_cids.push(existing.cid.clone());
1336 }
1337
1338 Ok(())
1339 }
1340
1341 async fn build_manifest_from_events(
1342 &self,
1343 events: Vec<StoredNostrEvent>,
1344 ) -> Result<Option<Cid>, NostrEventStoreError> {
1345 let mut prepared_events = Vec::with_capacity(events.len());
1346
1347 for (sequence, event) in events.into_iter().enumerate() {
1348 let normalized = self.validate_event(event).await?;
1349 prepared_events.push(self.prepare_event_blob(sequence, normalized, None)?);
1350 }
1351 let indexed_events = self
1352 .put_prepared_event_blobs_parallel(prepared_events)
1353 .await?
1354 .into_iter()
1355 .map(|(_sequence, event, event_cid, _previous)| (event, event_cid))
1356 .collect::<Vec<_>>();
1357
1358 let mut collection = CollectionWriter::with_options(
1359 Arc::clone(&self.store),
1360 nostr_collection_definition(),
1361 nostr_collection_options(&self.options),
1362 );
1363 collection.rebuild(indexed_events).await?;
1364 collection.write_root().await.map_err(Into::into)
1365 }
1366
1367 async fn upgrade_manifest_with_missing_indexes(
1368 &self,
1369 manifest: &NostrEventManifest,
1370 ) -> Result<Option<NostrEventManifest>, NostrEventStoreError> {
1371 let mut next_manifest = manifest.clone();
1372 let mut changed = false;
1373
1374 if next_manifest.by_kind_time_author.is_none() {
1375 if let Some(by_author_kind_time_root) = manifest.by_author_kind_time.as_ref() {
1376 let entries = self
1377 .index
1378 .links_entries(Some(by_author_kind_time_root))
1379 .await?;
1380 let mut derived = BTreeMap::new();
1381 for (key, cid) in entries {
1382 derived.insert(kind_time_author_key_from_author_kind_time_key(&key)?, cid);
1383 }
1384 next_manifest.by_kind_time_author = self.index.build_links(derived).await?;
1385 changed = true;
1386 }
1387 }
1388
1389 if changed {
1390 Ok(Some(next_manifest))
1391 } else {
1392 Ok(None)
1393 }
1394 }
1395
1396 async fn resolve_replaceable_decision(
1397 &self,
1398 manifest: &NostrEventManifest,
1399 event: &StoredNostrEvent,
1400 ) -> Result<ReplaceableDecision, NostrEventStoreError> {
1401 let slot = if is_replaceable_kind(event.kind) {
1402 Some((
1403 ReplaceableSlot::Replaceable,
1404 replaceable_key(&event.pubkey, event.kind),
1405 ))
1406 } else if is_parameterized_replaceable_kind(event.kind) {
1407 Some((
1408 ReplaceableSlot::Parameterized,
1409 parameterized_replaceable_key(
1410 &event.pubkey,
1411 event.kind,
1412 ¶meterized_replaceable_d_tag(event),
1413 ),
1414 ))
1415 } else {
1416 None
1417 };
1418
1419 let Some((slot_kind, key)) = slot else {
1420 return Ok(ReplaceableDecision::Accept {
1421 replaced: None,
1422 slot: None,
1423 });
1424 };
1425
1426 let source = self.collection_source_from_manifest(manifest);
1427 let existing_cid = match slot_kind {
1428 ReplaceableSlot::Replaceable => {
1429 source.get_index_link(MANIFEST_REPLACEABLE, &key).await?
1430 }
1431 ReplaceableSlot::Parameterized => {
1432 source
1433 .get_index_link(MANIFEST_PARAMETERIZED_REPLACEABLE, &key)
1434 .await?
1435 }
1436 };
1437
1438 let Some(existing_cid) = existing_cid else {
1439 return Ok(ReplaceableDecision::Accept {
1440 replaced: None,
1441 slot: Some((slot_kind, key)),
1442 });
1443 };
1444
1445 let existing = match self.read_stored_event(&existing_cid).await {
1446 Ok(existing) => existing,
1447 Err(err) if is_missing_stored_event_error(&err) => {
1448 return Ok(ReplaceableDecision::Accept {
1449 replaced: None,
1450 slot: Some((slot_kind, key)),
1451 });
1452 }
1453 Err(err) => return Err(err),
1454 };
1455 if compare_events(event, &existing) > 0 {
1456 return Ok(ReplaceableDecision::Accept {
1457 replaced: Some(ExistingReplaceableEvent {
1458 event: existing,
1459 cid: existing_cid,
1460 }),
1461 slot: Some((slot_kind, key)),
1462 });
1463 }
1464
1465 Ok(ReplaceableDecision::Reject)
1466 }
1467
1468 async fn delete_obsolete_event_blobs(
1469 &self,
1470 obsolete_event_cids: &[Cid],
1471 ) -> Result<(), NostrEventStoreError> {
1472 for cid in obsolete_event_cids {
1473 self.store.delete(&cid.hash).await.map_err(|err| {
1474 NostrEventStoreError::HashTree(HashTreeError::Store(err.to_string()))
1475 })?;
1476 }
1477 Ok(())
1478 }
1479
1480 async fn write_manifest(
1481 &self,
1482 manifest: &NostrEventManifest,
1483 ) -> Result<Option<Cid>, NostrEventStoreError> {
1484 let collection = self.collection_writer_from_manifest(manifest);
1485 Ok(collection.write_root().await?)
1486 }
1487
1488 fn collection_source_from_manifest(
1489 &self,
1490 manifest: &NostrEventManifest,
1491 ) -> CollectionSource<S> {
1492 CollectionSource::new(
1493 Arc::clone(&self.store),
1494 collection_state_from_nostr_manifest(manifest),
1495 )
1496 }
1497
1498 fn collection_writer_from_manifest(
1499 &self,
1500 manifest: &NostrEventManifest,
1501 ) -> CollectionWriter<S, StoredNostrEvent> {
1502 CollectionWriter::with_state_and_options(
1503 Arc::clone(&self.store),
1504 nostr_collection_definition(),
1505 collection_state_from_nostr_manifest(manifest),
1506 nostr_collection_options(&self.options),
1507 )
1508 }
1509
1510 async fn validate_event(
1511 &self,
1512 event: StoredNostrEvent,
1513 ) -> Result<StoredNostrEvent, NostrEventStoreError> {
1514 let normalized = self.validate_event_shape(event)?;
1515 let payload = serde_json::to_string(&(
1516 0u8,
1517 normalized.pubkey.clone(),
1518 normalized.created_at,
1519 normalized.kind,
1520 normalized.tags.clone(),
1521 normalized.content.clone(),
1522 ))?;
1523 let computed = hex::encode(sha256(payload.as_bytes()));
1524 if computed != normalized.id {
1525 return Err(NostrEventStoreError::Validation(
1526 "event id does not match canonical nostr payload".to_string(),
1527 ));
1528 }
1529 Ok(normalized)
1530 }
1531
1532 fn encode_validated_event(
1533 &self,
1534 event: &StoredNostrEvent,
1535 ) -> Result<Vec<u8>, NostrEventStoreError> {
1536 encode_stored_event_msgpack(event)
1537 }
1538
1539 fn validate_event_shape(
1540 &self,
1541 event: StoredNostrEvent,
1542 ) -> Result<StoredNostrEvent, NostrEventStoreError> {
1543 normalize_signed_event(event)
1544 }
1545}
1546
1547fn normalize_signed_event(
1548 event: StoredNostrEvent,
1549) -> Result<StoredNostrEvent, NostrEventStoreError> {
1550 Ok(StoredNostrEvent {
1551 id: validate_lower_hex(&event.id, 64, "event id")?,
1552 pubkey: validate_lower_hex(&event.pubkey, 64, "pubkey")?,
1553 created_at: event.created_at,
1554 kind: event.kind,
1555 tags: event.tags,
1556 content: event.content,
1557 sig: validate_lower_hex(&event.sig, 128, "signature")?,
1558 })
1559}
1560
1561fn has_label(event: &StoredNostrEvent, label: &str) -> bool {
1562 event.tags.iter().any(|tag| {
1563 tag.first().map(String::as_str) == Some("l")
1564 && tag.get(1).map(String::as_str) == Some(label)
1565 })
1566}
1567
1568fn has_any_label(event: &StoredNostrEvent) -> bool {
1569 event
1570 .tags
1571 .iter()
1572 .any(|tag| tag.first().map(String::as_str) == Some("l"))
1573}
1574
1575fn unique_labels(labels: Vec<String>) -> Vec<String> {
1576 let mut seen = std::collections::BTreeSet::new();
1577 let mut result = Vec::new();
1578 for label in labels {
1579 if label.is_empty() || !seen.insert(label.clone()) {
1580 continue;
1581 }
1582 result.push(label);
1583 }
1584 result
1585}
1586
1587fn validate_lower_hex(
1588 value: &str,
1589 expected_len: usize,
1590 label: &str,
1591) -> Result<String, NostrEventStoreError> {
1592 let valid = value.len() == expected_len
1593 && value
1594 .bytes()
1595 .all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte));
1596 if valid {
1597 Ok(value.to_string())
1598 } else {
1599 Err(NostrEventStoreError::Validation(format!(
1600 "{label} must be a lowercase {expected_len}-character hex string"
1601 )))
1602 }
1603}
1604
1605fn pad_kind(kind: u32) -> String {
1606 format!("{kind:08x}")
1607}
1608
1609fn reverse_timestamp(created_at: u64) -> String {
1610 format!("{:016x}", u64::MAX - created_at)
1611}
1612
1613fn author_time_key(event: &StoredNostrEvent) -> String {
1614 format!(
1615 "{}:{}:{}",
1616 event.pubkey,
1617 reverse_timestamp(event.created_at),
1618 event.id
1619 )
1620}
1621
1622fn author_kind_time_key(event: &StoredNostrEvent) -> String {
1623 format!(
1624 "{}:{}:{}:{}",
1625 event.pubkey,
1626 pad_kind(event.kind),
1627 reverse_timestamp(event.created_at),
1628 event.id
1629 )
1630}
1631
1632fn kind_time_key(event: &StoredNostrEvent) -> String {
1633 format!(
1634 "{}:{}:{}",
1635 pad_kind(event.kind),
1636 reverse_timestamp(event.created_at),
1637 event.id
1638 )
1639}
1640
1641fn kind_time_author_key(event: &StoredNostrEvent) -> String {
1642 format!(
1643 "{}:{}:{}:{}",
1644 pad_kind(event.kind),
1645 reverse_timestamp(event.created_at),
1646 event.pubkey,
1647 event.id
1648 )
1649}
1650
1651fn kind_time_author_key_from_author_kind_time_key(
1652 key: &str,
1653) -> Result<String, NostrEventStoreError> {
1654 let mut parts = key.split(':');
1655 let Some(pubkey) = parts.next() else {
1656 return Err(NostrEventStoreError::Validation(format!(
1657 "invalid author-kind-time key: {key}"
1658 )));
1659 };
1660 let Some(kind) = parts.next() else {
1661 return Err(NostrEventStoreError::Validation(format!(
1662 "invalid author-kind-time key: {key}"
1663 )));
1664 };
1665 let Some(reversed_timestamp) = parts.next() else {
1666 return Err(NostrEventStoreError::Validation(format!(
1667 "invalid author-kind-time key: {key}"
1668 )));
1669 };
1670 let Some(event_id) = parts.next() else {
1671 return Err(NostrEventStoreError::Validation(format!(
1672 "invalid author-kind-time key: {key}"
1673 )));
1674 };
1675 if parts.next().is_some() {
1676 return Err(NostrEventStoreError::Validation(format!(
1677 "invalid author-kind-time key: {key}"
1678 )));
1679 }
1680
1681 Ok(format!(
1682 "{}:{}:{}:{}",
1683 kind, reversed_timestamp, pubkey, event_id
1684 ))
1685}
1686
1687fn time_key(event: &StoredNostrEvent) -> String {
1688 format!("{}:{}", reverse_timestamp(event.created_at), event.id)
1689}
1690
1691fn created_at_from_index_key(key: &str) -> Result<u64, NostrEventStoreError> {
1692 let mut parts = key.rsplitn(3, ':');
1693 let _event_id = parts.next();
1694 let Some(reversed) = parts.next() else {
1695 return Err(NostrEventStoreError::Validation(format!(
1696 "invalid nostr index key: {key}"
1697 )));
1698 };
1699 let reversed = u64::from_str_radix(reversed, 16).map_err(|err| {
1700 NostrEventStoreError::Validation(format!(
1701 "invalid reversed timestamp in nostr index key {key}: {err}"
1702 ))
1703 })?;
1704 Ok(u64::MAX - reversed)
1705}
1706
1707fn tag_keys(event: &StoredNostrEvent) -> Vec<String> {
1708 event
1709 .tags
1710 .iter()
1711 .filter_map(|tag| match tag.as_slice() {
1712 [name, value, ..] if !name.is_empty() && !value.is_empty() => {
1713 let normalized_name = name.to_lowercase();
1714 let normalized_value = normalize_tag_value(&normalized_name, value);
1715 Some(format!(
1716 "{}:{}:{}:{}",
1717 normalized_name,
1718 normalized_value,
1719 reverse_timestamp(event.created_at),
1720 event.id
1721 ))
1722 }
1723 _ => None,
1724 })
1725 .collect()
1726}
1727
1728fn tag_prefix(tag_name: &str, tag_value: &str) -> Result<String, NostrEventStoreError> {
1729 let normalized_name = normalize_tag_name(tag_name)?;
1730 let normalized_value = normalize_tag_value(&normalized_name, tag_value);
1731 Ok(format!("{normalized_name}:{normalized_value}:"))
1732}
1733
1734fn normalize_tag_name(tag_name: &str) -> Result<String, NostrEventStoreError> {
1735 if tag_name.is_empty() {
1736 return Err(NostrEventStoreError::Validation(
1737 "tag name must be non-empty".to_string(),
1738 ));
1739 }
1740 Ok(tag_name.to_lowercase())
1741}
1742
1743fn normalize_tag_value(tag_name: &str, tag_value: &str) -> String {
1744 if tag_name == "t" {
1745 tag_value.to_lowercase()
1746 } else {
1747 tag_value.to_string()
1748 }
1749}
1750
1751fn replaceable_key(pubkey: &str, kind: u32) -> String {
1752 format!("{}:{}", pubkey, pad_kind(kind))
1753}
1754
1755fn parameterized_replaceable_key(pubkey: &str, kind: u32, d_tag: &str) -> String {
1756 format!("{}:{}:{}", pubkey, pad_kind(kind), d_tag)
1757}
1758
1759pub fn is_replaceable_kind(kind: u32) -> bool {
1760 kind == 0 || kind == 3 || kind == 41 || (10_000..20_000).contains(&kind)
1761}
1762
1763pub fn is_parameterized_replaceable_kind(kind: u32) -> bool {
1764 (30_000..40_000).contains(&kind)
1765}
1766
1767fn get_d_tag(event: &StoredNostrEvent) -> Option<String> {
1768 event.tags.iter().find_map(|tag| match tag.as_slice() {
1769 [name, value, ..] if name == "d" && !value.is_empty() => Some(value.clone()),
1770 _ => None,
1771 })
1772}
1773
1774fn parameterized_replaceable_d_tag(event: &StoredNostrEvent) -> String {
1775 get_d_tag(event).unwrap_or_default()
1776}
1777
1778fn compare_events(left: &StoredNostrEvent, right: &StoredNostrEvent) -> i8 {
1779 match left.created_at.cmp(&right.created_at) {
1780 std::cmp::Ordering::Less => -1,
1781 std::cmp::Ordering::Greater => 1,
1782 std::cmp::Ordering::Equal => match left.id.cmp(&right.id) {
1783 std::cmp::Ordering::Less => -1,
1784 std::cmp::Ordering::Greater => 1,
1785 std::cmp::Ordering::Equal => 0,
1786 },
1787 }
1788}
1789
1790fn retain_latest_replaceable_events(events: Vec<StoredNostrEvent>) -> Vec<StoredNostrEvent> {
1791 let mut winners = BTreeMap::<(ReplaceableSlot, String), StoredNostrEvent>::new();
1792 let mut plain = Vec::new();
1793
1794 for event in events {
1795 let slot = if is_replaceable_kind(event.kind) {
1796 Some((
1797 ReplaceableSlot::Replaceable,
1798 replaceable_key(&event.pubkey, event.kind),
1799 ))
1800 } else if is_parameterized_replaceable_kind(event.kind) {
1801 Some((
1802 ReplaceableSlot::Parameterized,
1803 parameterized_replaceable_key(
1804 &event.pubkey,
1805 event.kind,
1806 ¶meterized_replaceable_d_tag(&event),
1807 ),
1808 ))
1809 } else {
1810 None
1811 };
1812
1813 if let Some(slot) = slot {
1814 match winners.get(&slot) {
1815 Some(current) if compare_events(&event, current) <= 0 => {}
1816 _ => {
1817 winners.insert(slot, event);
1818 }
1819 }
1820 } else {
1821 plain.push(event);
1822 }
1823 }
1824
1825 plain.extend(winners.into_values());
1826 plain
1827}