Skip to main content

hashtree_nostr/
lib.rs

1//! Hashtree-native Nostr event indexes.
2
3pub mod crawl;
4pub mod tree_event_snapshots;
5pub use crawl::{
6    CrawlConfig, CrawlError, CrawlReport, EventSelectionPolicy, KindPriorityPolicy, NostrBridge,
7    RelayFetchMode,
8};
9pub use tree_event_snapshots::{
10    compare_tree_event_snapshots, is_newer_tree_event_snapshot,
11    parse_tree_event_snapshot_permalink, read_tree_event_snapshot, resolve_snapshot_root_cid,
12    serialize_tree_event_snapshot_permalink, snapshot_matches_root_cid, store_tree_event_snapshot,
13    TreeEventSnapshotInfo, TreeEventSnapshotPermalink,
14};
15
16use std::collections::BTreeMap;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use std::cell::RefCell;
22
23use futures::{stream, StreamExt, TryStreamExt};
24use hashtree_collection::{
25    load_collection_manifest_metadata, load_collection_state, CollectionDefinition,
26    CollectionOptions, CollectionPublishedSchema, CollectionSource, CollectionState,
27    CollectionWriter,
28};
29use hashtree_core::{
30    sha256, BufferedStore, Cid, HashTree, HashTreeConfig, HashTreeError, Store, TreeVisibility,
31};
32use hashtree_index::{BTree, BTreeError, BTreeOptions};
33use nostr_sdk::nips::nip44::{self, Version as Nip44Version};
34use nostr_sdk::{
35    Alphabet, Event, EventBuilder, Keys, Kind, PublicKey, SingleLetterTag, Tag, TagKind,
36};
37use serde::{Deserialize, Serialize};
38
39pub const NOSTR_EVENT_ENVELOPE_VERSION: u8 = 1;
40pub const HASHTREE_ROOT_KIND: u32 = 30078;
41pub const HASHTREE_LABEL: &str = "hashtree";
42pub const TAG_HASH: &str = "hash";
43pub const TAG_KEY: &str = "key";
44pub const TAG_ENCRYPTED_KEY: &str = "encryptedKey";
45pub const TAG_KEY_ID: &str = "keyId";
46pub const TAG_SELF_ENCRYPTED_KEY: &str = "selfEncryptedKey";
47pub const TAG_SELF_ENCRYPTED_LINK_KEY: &str = "selfEncryptedLinkKey";
48const MANIFEST_BY_AUTHOR_TIME: &str = "by-author-time";
49const MANIFEST_BY_AUTHOR_KIND_TIME: &str = "by-author-kind-time";
50const MANIFEST_BY_KIND_TIME: &str = "by-kind-time";
51const MANIFEST_BY_KIND_TIME_AUTHOR: &str = "by-kind-time-author";
52const MANIFEST_BY_TIME: &str = "by-time";
53const MANIFEST_BY_TAG: &str = "by-tag";
54const MANIFEST_REPLACEABLE: &str = "replaceable";
55const MANIFEST_PARAMETERIZED_REPLACEABLE: &str = "parameterized-replaceable";
56const EVENT_BLOB_WRITE_CONCURRENCY: usize = 64;
57const NOSTR_EVENT_ITEM_FORMAT: &str = "nostr/event@1";
58const NOSTR_EVENT_PROJECTION_FORMAT: &str = "hashtree/nostr-event-index@1";
59const MAX_SNAPSHOT_BYTES: usize = 256 * 1024;
60
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub struct StoredNostrEvent {
63    pub id: String,
64    pub pubkey: String,
65    pub created_at: u64,
66    pub kind: u32,
67    pub tags: Vec<Vec<String>>,
68    pub content: String,
69    pub sig: String,
70}
71
72/// A Nostr SDK event whose id and Schnorr signature have been checked locally.
73#[derive(Debug, Clone)]
74pub struct VerifiedEvent(Event);
75
76impl VerifiedEvent {
77    pub fn as_event(&self) -> &Event {
78        &self.0
79    }
80
81    pub fn into_event(self) -> Event {
82        self.0
83    }
84
85    pub fn to_stored_event(&self) -> VerifiedStoredNostrEvent {
86        VerifiedStoredNostrEvent {
87            event: stored_event_from_nostr_sdk_event(&self.0),
88        }
89    }
90}
91
92impl TryFrom<Event> for VerifiedEvent {
93    type Error = NostrEventStoreError;
94
95    fn try_from(event: Event) -> Result<Self, Self::Error> {
96        verify_nostr_sdk_event(&event)?;
97        Ok(Self(event))
98    }
99}
100
101impl AsRef<Event> for VerifiedEvent {
102    fn as_ref(&self) -> &Event {
103        self.as_event()
104    }
105}
106
107/// A stored Nostr event whose serialized event has been locally verified.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct VerifiedStoredNostrEvent {
110    event: StoredNostrEvent,
111}
112
113impl VerifiedStoredNostrEvent {
114    pub fn as_stored(&self) -> &StoredNostrEvent {
115        &self.event
116    }
117
118    pub fn into_stored(self) -> StoredNostrEvent {
119        self.event
120    }
121
122    pub fn to_nostr_sdk_event(&self) -> Result<VerifiedEvent, NostrEventStoreError> {
123        VerifiedEvent::try_from(nostr_sdk_event_from_stored_event(&self.event)?)
124    }
125}
126
127impl TryFrom<StoredNostrEvent> for VerifiedStoredNostrEvent {
128    type Error = NostrEventStoreError;
129
130    fn try_from(event: StoredNostrEvent) -> Result<Self, Self::Error> {
131        let event = normalize_signed_event(event)?;
132        let sdk_event = nostr_sdk_event_from_stored_event(&event)?;
133        verify_nostr_sdk_event(&sdk_event)?;
134        Ok(Self { event })
135    }
136}
137
138impl AsRef<StoredNostrEvent> for VerifiedStoredNostrEvent {
139    fn as_ref(&self) -> &StoredNostrEvent {
140        self.as_stored()
141    }
142}
143
144#[derive(Debug, Clone, PartialEq)]
145pub struct ParsedHashtreeRootEvent {
146    pub event: StoredNostrEvent,
147    pub tree_name: String,
148    pub root_cid: Cid,
149    pub visibility: TreeVisibility,
150    pub labels: Vec<String>,
151    pub encrypted_key: Option<String>,
152    pub key_id: Option<String>,
153    pub self_encrypted_key: Option<String>,
154    pub self_encrypted_link_key: Option<String>,
155}
156
157#[derive(Debug, Clone, Default, PartialEq)]
158pub struct NostrEventManifest {
159    pub by_id: Option<Cid>,
160    pub by_author_time: Option<Cid>,
161    pub by_author_kind_time: Option<Cid>,
162    pub by_kind_time: Option<Cid>,
163    pub by_kind_time_author: Option<Cid>,
164    pub by_time: Option<Cid>,
165    pub by_tag: Option<Cid>,
166    pub replaceable: Option<Cid>,
167    pub parameterized_replaceable: Option<Cid>,
168}
169
170#[derive(Debug, Clone, Default)]
171pub struct ListEventsOptions {
172    pub limit: Option<usize>,
173    pub since: Option<u64>,
174    pub until: Option<u64>,
175}
176
177#[derive(Debug, Clone, Default)]
178pub struct ProfileStat {
179    pub label: &'static str,
180    pub count: u64,
181    pub total: Duration,
182    pub max: Duration,
183}
184
185#[derive(Debug, Default)]
186struct ProfileAccumulator {
187    count: u64,
188    total: Duration,
189    max: Duration,
190}
191
192static PROFILE_ENABLED: AtomicBool = AtomicBool::new(false);
193
194thread_local! {
195    static PROFILE_STATE: RefCell<BTreeMap<&'static str, ProfileAccumulator>> =
196        const { RefCell::new(BTreeMap::new()) };
197}
198
199pub fn set_profile_enabled(enabled: bool) {
200    PROFILE_ENABLED.store(enabled, Ordering::Relaxed);
201}
202
203pub fn reset_profile() {
204    PROFILE_STATE.with(|state| state.borrow_mut().clear());
205}
206
207pub fn take_profile() -> Vec<ProfileStat> {
208    PROFILE_STATE.with(|state| {
209        state
210            .borrow()
211            .iter()
212            .map(|(&label, acc)| ProfileStat {
213                label,
214                count: acc.count,
215                total: acc.total,
216                max: acc.max,
217            })
218            .collect()
219    })
220}
221
222pub struct ProfileGuard {
223    label: &'static str,
224    started: Option<Instant>,
225}
226
227impl ProfileGuard {
228    pub fn new(label: &'static str) -> Self {
229        let started = PROFILE_ENABLED.load(Ordering::Relaxed).then(Instant::now);
230        Self { label, started }
231    }
232}
233
234impl Drop for ProfileGuard {
235    fn drop(&mut self) {
236        let Some(started) = self.started else {
237            return;
238        };
239        let elapsed = started.elapsed();
240        PROFILE_STATE.with(|state| {
241            let mut state = state.borrow_mut();
242            let entry = state.entry(self.label).or_default();
243            entry.count += 1;
244            entry.total += elapsed;
245            entry.max = entry.max.max(elapsed);
246        });
247    }
248}
249
250#[derive(Debug, thiserror::Error)]
251pub enum NostrEventStoreError {
252    #[error("hash tree error: {0}")]
253    HashTree(#[from] HashTreeError),
254    #[error("index error: {0}")]
255    Index(#[from] BTreeError),
256    #[error("collection error: {0}")]
257    Collection(#[from] hashtree_collection::CollectionError),
258    #[error("encode error: {0}")]
259    Encode(#[from] rmp_serde::encode::Error),
260    #[error("decode error: {0}")]
261    Decode(#[from] rmp_serde::decode::Error),
262    #[error("json error: {0}")]
263    Json(#[from] serde_json::Error),
264    #[error("{0}")]
265    Validation(String),
266}
267
268const MISSING_STORED_EVENT_BLOB_ERROR: &str = "stored nostr event blob is missing";
269
270fn is_missing_stored_event_error(err: &NostrEventStoreError) -> bool {
271    matches!(
272        err,
273        NostrEventStoreError::Validation(message) if message == MISSING_STORED_EVENT_BLOB_ERROR
274    )
275}
276
277pub struct NostrEventStore<S: Store> {
278    store: Arc<S>,
279    tree: HashTree<S>,
280    index: BTree<S>,
281    options: NostrEventStoreOptions,
282}
283
284#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
285enum ReplaceableSlot {
286    Replaceable,
287    Parameterized,
288}
289
290#[derive(Debug, Clone)]
291struct ExistingReplaceableEvent {
292    event: StoredNostrEvent,
293    cid: Cid,
294}
295
296struct PreparedEventBlob {
297    sequence: usize,
298    event: StoredNostrEvent,
299    bytes: Vec<u8>,
300    previous: Option<StoredNostrEvent>,
301}
302
303#[allow(clippy::large_enum_variant)]
304#[derive(Debug, Clone)]
305enum ReplaceableDecision {
306    Accept {
307        replaced: Option<ExistingReplaceableEvent>,
308        slot: Option<(ReplaceableSlot, String)>,
309    },
310    Reject,
311}
312
313pub fn encode_signed_event_json(event: &StoredNostrEvent) -> Result<Vec<u8>, NostrEventStoreError> {
314    let normalized = normalize_signed_event(event.clone())?;
315    Ok(serde_json::to_vec(&normalized)?)
316}
317
318pub fn decode_signed_event_json(data: &[u8]) -> Result<StoredNostrEvent, NostrEventStoreError> {
319    let decoded: StoredNostrEvent = serde_json::from_slice(data)?;
320    normalize_signed_event(decoded)
321}
322
323pub fn encode_stored_event_msgpack(
324    event: &StoredNostrEvent,
325) -> Result<Vec<u8>, NostrEventStoreError> {
326    let normalized = normalize_signed_event(event.clone())?;
327    Ok(rmp_serde::to_vec(&(
328        NOSTR_EVENT_ENVELOPE_VERSION,
329        normalized.id,
330        normalized.pubkey,
331        normalized.created_at,
332        normalized.kind,
333        normalized.tags,
334        normalized.content,
335        normalized.sig,
336    ))?)
337}
338
339pub fn decode_stored_event_msgpack(data: &[u8]) -> Result<StoredNostrEvent, NostrEventStoreError> {
340    let (version, id, pubkey, created_at, kind, tags, content, sig): (
341        u8,
342        String,
343        String,
344        u64,
345        u32,
346        Vec<Vec<String>>,
347        String,
348        String,
349    ) = rmp_serde::from_slice(data)?;
350
351    if version != NOSTR_EVENT_ENVELOPE_VERSION {
352        return Err(NostrEventStoreError::Validation(format!(
353            "unsupported event envelope version: {version}"
354        )));
355    }
356
357    normalize_signed_event(StoredNostrEvent {
358        id,
359        pubkey,
360        created_at,
361        kind,
362        tags,
363        content,
364        sig,
365    })
366}
367
368pub async fn store_signed_event_snapshot<S: Store>(
369    store: Arc<S>,
370    event: &StoredNostrEvent,
371) -> Result<Cid, NostrEventStoreError> {
372    let bytes = encode_signed_event_json(event)?;
373    let tree = HashTree::new(HashTreeConfig::new(store).public());
374    let (cid, _) = tree.put(&bytes).await?;
375    Ok(cid)
376}
377
378pub async fn read_signed_event_snapshot<S: Store>(
379    store: Arc<S>,
380    snapshot_cid: &Cid,
381    max_bytes: Option<usize>,
382) -> Result<StoredNostrEvent, NostrEventStoreError> {
383    let max_bytes = max_bytes.unwrap_or(MAX_SNAPSHOT_BYTES);
384    let tree = HashTree::new(HashTreeConfig::new(store).public());
385    let data = tree
386        .get(snapshot_cid, Some((max_bytes + 1) as u64))
387        .await?
388        .ok_or_else(|| {
389            NostrEventStoreError::Validation("signed Nostr event snapshot is missing".to_string())
390        })?;
391    if data.len() > max_bytes {
392        return Err(NostrEventStoreError::Validation(format!(
393            "signed Nostr event snapshot exceeds {max_bytes} bytes"
394        )));
395    }
396    decode_signed_event_json(&data)
397}
398
399pub fn stored_event_from_nostr_sdk_event(event: &Event) -> StoredNostrEvent {
400    StoredNostrEvent {
401        id: event.id.to_hex(),
402        pubkey: event.pubkey.to_hex(),
403        created_at: event.created_at.as_secs(),
404        kind: u32::from(event.kind.as_u16()),
405        tags: event
406            .tags
407            .iter()
408            .map(|tag| tag.as_slice().to_vec())
409            .collect(),
410        content: event.content.clone(),
411        sig: event.sig.to_string(),
412    }
413}
414
415fn nostr_sdk_event_from_stored_event(
416    event: &StoredNostrEvent,
417) -> Result<Event, NostrEventStoreError> {
418    let bytes = serde_json::to_vec(event)?;
419    Ok(serde_json::from_slice(&bytes)?)
420}
421
422fn verify_nostr_sdk_event(event: &Event) -> Result<(), NostrEventStoreError> {
423    event.verify().map_err(|err| {
424        NostrEventStoreError::Validation(format!("signature verification failed: {err}"))
425    })
426}
427
428pub fn parse_hashtree_root_event(
429    event: &StoredNostrEvent,
430) -> Result<Option<ParsedHashtreeRootEvent>, NostrEventStoreError> {
431    let normalized = normalize_signed_event(event.clone())?;
432    if normalized.kind != HASHTREE_ROOT_KIND {
433        return Ok(None);
434    }
435
436    let tree_name = normalized
437        .tags
438        .iter()
439        .find(|tag| tag.first().map(String::as_str) == Some("d"))
440        .and_then(|tag| tag.get(1))
441        .cloned();
442    let hash_hex = normalized
443        .tags
444        .iter()
445        .find(|tag| tag.first().map(String::as_str) == Some(TAG_HASH))
446        .and_then(|tag| tag.get(1))
447        .cloned();
448    let (Some(tree_name), Some(hash_hex)) = (tree_name, hash_hex) else {
449        return Ok(None);
450    };
451
452    if has_any_label(&normalized) && !has_label(&normalized, HASHTREE_LABEL) {
453        return Ok(None);
454    }
455
456    let labels = unique_labels(
457        normalized
458            .tags
459            .iter()
460            .filter(|tag| tag.first().map(String::as_str) == Some("l"))
461            .filter_map(|tag| tag.get(1).cloned())
462            .collect(),
463    );
464    let key_hex = normalized
465        .tags
466        .iter()
467        .find(|tag| tag.first().map(String::as_str) == Some(TAG_KEY))
468        .and_then(|tag| tag.get(1))
469        .cloned();
470    let encrypted_key = normalized
471        .tags
472        .iter()
473        .find(|tag| tag.first().map(String::as_str) == Some(TAG_ENCRYPTED_KEY))
474        .and_then(|tag| tag.get(1))
475        .cloned();
476    let key_id = normalized
477        .tags
478        .iter()
479        .find(|tag| tag.first().map(String::as_str) == Some(TAG_KEY_ID))
480        .and_then(|tag| tag.get(1))
481        .cloned();
482    let self_encrypted_key = normalized
483        .tags
484        .iter()
485        .find(|tag| tag.first().map(String::as_str) == Some(TAG_SELF_ENCRYPTED_KEY))
486        .and_then(|tag| tag.get(1))
487        .cloned();
488    let self_encrypted_link_key = normalized
489        .tags
490        .iter()
491        .find(|tag| tag.first().map(String::as_str) == Some(TAG_SELF_ENCRYPTED_LINK_KEY))
492        .and_then(|tag| tag.get(1))
493        .cloned();
494
495    let visibility = if encrypted_key.is_some() {
496        TreeVisibility::LinkVisible
497    } else if self_encrypted_key.is_some() {
498        TreeVisibility::Private
499    } else {
500        TreeVisibility::Public
501    };
502
503    let hash = hashtree_core::from_hex(&hash_hex).map_err(|_| {
504        NostrEventStoreError::Validation(
505            "root hash must be a lowercase 64-character hex string".to_string(),
506        )
507    })?;
508    let key = match (visibility, key_hex) {
509        (TreeVisibility::Public, Some(key_hex)) => {
510            Some(hashtree_core::from_hex(&key_hex).map_err(|_| {
511                NostrEventStoreError::Validation(
512                    "root key must be a lowercase 64-character hex string".to_string(),
513                )
514            })?)
515        }
516        _ => None,
517    };
518
519    Ok(Some(ParsedHashtreeRootEvent {
520        event: normalized,
521        tree_name,
522        root_cid: Cid { hash, key },
523        visibility,
524        labels,
525        encrypted_key,
526        key_id,
527        self_encrypted_key,
528        self_encrypted_link_key,
529    }))
530}
531
532pub fn parse_verified_hashtree_root_event(
533    event: &Event,
534) -> Result<Option<ParsedHashtreeRootEvent>, NostrEventStoreError> {
535    let verified = VerifiedEvent::try_from(event.clone())?;
536    parse_hashtree_root_event(verified.to_stored_event().as_stored())
537}
538
539pub fn resolve_self_encrypted_root_cid(
540    parsed: &ParsedHashtreeRootEvent,
541    owner_keys: &Keys,
542) -> Result<Cid, NostrEventStoreError> {
543    if parsed.root_cid.key.is_some() {
544        return Ok(parsed.root_cid.clone());
545    }
546
547    let ciphertext = parsed.self_encrypted_key.as_ref().ok_or_else(|| {
548        NostrEventStoreError::Validation("hashtree root key is unavailable".to_string())
549    })?;
550    let author = PublicKey::from_hex(&parsed.event.pubkey).map_err(|err| {
551        NostrEventStoreError::Validation(format!("invalid root event pubkey: {err}"))
552    })?;
553    if owner_keys.public_key() != author {
554        return Err(NostrEventStoreError::Validation(format!(
555            "owner key {} does not match root event author {}",
556            owner_keys.public_key().to_hex(),
557            parsed.event.pubkey
558        )));
559    }
560    let key_hex = nip44::decrypt(owner_keys.secret_key(), &author, ciphertext).map_err(|_| {
561        NostrEventStoreError::Validation("hashtree root key is unavailable".to_string())
562    })?;
563    let key = hashtree_core::from_hex(&key_hex).map_err(|_| {
564        NostrEventStoreError::Validation(
565            "root key must be a lowercase 64-character hex string".to_string(),
566        )
567    })?;
568    Ok(Cid {
569        hash: parsed.root_cid.hash,
570        key: Some(key),
571    })
572}
573
574pub fn build_private_hashtree_root_event(
575    owner_keys: &Keys,
576    tree_name: &str,
577    root_cid: &Cid,
578    created_at: Option<u64>,
579) -> Result<Event, NostrEventStoreError> {
580    let Some(root_key) = root_cid.key else {
581        return Err(NostrEventStoreError::Validation(
582            "private hashtree root requires an encrypted CID".to_string(),
583        ));
584    };
585    let root_key_hex = hex::encode(root_key);
586    let self_encrypted_key = nip44::encrypt(
587        owner_keys.secret_key(),
588        &owner_keys.public_key(),
589        root_key_hex,
590        Nip44Version::V2,
591    )
592    .map_err(|err| {
593        NostrEventStoreError::Validation(format!("self-encrypted root key failed: {err}"))
594    })?;
595    let builder = EventBuilder::new(Kind::from(HASHTREE_ROOT_KIND as u16), "").tags([
596        Tag::identifier(tree_name.to_string()),
597        Tag::custom(
598            TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
599            vec![HASHTREE_LABEL],
600        ),
601        Tag::custom(
602            TagKind::Custom(TAG_HASH.into()),
603            vec![hex::encode(root_cid.hash)],
604        ),
605        Tag::custom(
606            TagKind::Custom(TAG_SELF_ENCRYPTED_KEY.into()),
607            vec![self_encrypted_key],
608        ),
609    ]);
610    let builder = if let Some(created_at) = created_at {
611        builder.custom_created_at(nostr_sdk::Timestamp::from(created_at))
612    } else {
613        builder
614    };
615    builder.sign_with_keys(owner_keys).map_err(|err| {
616        NostrEventStoreError::Validation(format!("build hashtree root event failed: {err}"))
617    })
618}
619
620#[derive(Debug, Clone, Default)]
621pub struct NostrEventStoreOptions {
622    pub btree_order: Option<usize>,
623}
624
625fn nostr_collection_definition() -> CollectionDefinition<StoredNostrEvent> {
626    CollectionDefinition::new(|event: &StoredNostrEvent| event.id.clone())
627        .with_published_schema(
628            CollectionPublishedSchema::new()
629                .with_item_format(NOSTR_EVENT_ITEM_FORMAT)
630                .with_projection_format(NOSTR_EVENT_PROJECTION_FORMAT),
631        )
632        .with_key_index(MANIFEST_BY_AUTHOR_TIME, |event| {
633            vec![author_time_key(event)]
634        })
635        .with_key_index(MANIFEST_BY_AUTHOR_KIND_TIME, |event| {
636            vec![author_kind_time_key(event)]
637        })
638        .with_key_index(MANIFEST_BY_KIND_TIME, |event| vec![kind_time_key(event)])
639        .with_key_index(MANIFEST_BY_KIND_TIME_AUTHOR, |event| {
640            vec![kind_time_author_key(event)]
641        })
642        .with_key_index(MANIFEST_BY_TIME, |event| vec![time_key(event)])
643        .with_key_index(MANIFEST_BY_TAG, tag_keys)
644        .with_key_index(MANIFEST_REPLACEABLE, |event| {
645            if is_replaceable_kind(event.kind) {
646                vec![replaceable_key(&event.pubkey, event.kind)]
647            } else {
648                Vec::new()
649            }
650        })
651        .with_key_index(MANIFEST_PARAMETERIZED_REPLACEABLE, |event| {
652            if is_parameterized_replaceable_kind(event.kind) {
653                vec![parameterized_replaceable_key(
654                    &event.pubkey,
655                    event.kind,
656                    &parameterized_replaceable_d_tag(event),
657                )]
658            } else {
659                Vec::new()
660            }
661        })
662}
663
664fn nostr_collection_options(options: &NostrEventStoreOptions) -> CollectionOptions {
665    CollectionOptions {
666        btree_order: options.btree_order,
667    }
668}
669
670fn collection_state_from_nostr_manifest(manifest: &NostrEventManifest) -> CollectionState {
671    let mut key_roots = BTreeMap::new();
672    key_roots.insert(
673        MANIFEST_BY_AUTHOR_TIME.to_string(),
674        manifest.by_author_time.clone(),
675    );
676    key_roots.insert(
677        MANIFEST_BY_AUTHOR_KIND_TIME.to_string(),
678        manifest.by_author_kind_time.clone(),
679    );
680    key_roots.insert(
681        MANIFEST_BY_KIND_TIME.to_string(),
682        manifest.by_kind_time.clone(),
683    );
684    key_roots.insert(
685        MANIFEST_BY_KIND_TIME_AUTHOR.to_string(),
686        manifest.by_kind_time_author.clone(),
687    );
688    key_roots.insert(MANIFEST_BY_TIME.to_string(), manifest.by_time.clone());
689    key_roots.insert(MANIFEST_BY_TAG.to_string(), manifest.by_tag.clone());
690    key_roots.insert(
691        MANIFEST_REPLACEABLE.to_string(),
692        manifest.replaceable.clone(),
693    );
694    key_roots.insert(
695        MANIFEST_PARAMETERIZED_REPLACEABLE.to_string(),
696        manifest.parameterized_replaceable.clone(),
697    );
698    CollectionState {
699        by_id_root: manifest.by_id.clone(),
700        key_roots,
701        search_roots: BTreeMap::new(),
702    }
703}
704
705fn nostr_manifest_from_collection_state(state: &CollectionState) -> NostrEventManifest {
706    NostrEventManifest {
707        by_id: state.by_id_root.clone(),
708        by_author_time: state.key_root(MANIFEST_BY_AUTHOR_TIME).cloned(),
709        by_author_kind_time: state.key_root(MANIFEST_BY_AUTHOR_KIND_TIME).cloned(),
710        by_kind_time: state.key_root(MANIFEST_BY_KIND_TIME).cloned(),
711        by_kind_time_author: state.key_root(MANIFEST_BY_KIND_TIME_AUTHOR).cloned(),
712        by_time: state.key_root(MANIFEST_BY_TIME).cloned(),
713        by_tag: state.key_root(MANIFEST_BY_TAG).cloned(),
714        replaceable: state.key_root(MANIFEST_REPLACEABLE).cloned(),
715        parameterized_replaceable: state.key_root(MANIFEST_PARAMETERIZED_REPLACEABLE).cloned(),
716    }
717}
718
719impl<S: Store> NostrEventStore<S> {
720    pub fn new(store: Arc<S>) -> Self {
721        Self::with_options(store, NostrEventStoreOptions::default())
722    }
723
724    pub fn with_options(store: Arc<S>, options: NostrEventStoreOptions) -> Self {
725        Self {
726            store: Arc::clone(&store),
727            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
728            index: BTree::new(
729                store,
730                BTreeOptions {
731                    order: options.btree_order,
732                },
733            ),
734            options,
735        }
736    }
737
738    pub fn encode_event(&self, event: &StoredNostrEvent) -> Result<Vec<u8>, NostrEventStoreError> {
739        encode_stored_event_msgpack(&self.validate_event_shape(event.clone())?)
740    }
741
742    pub fn decode_event(&self, data: &[u8]) -> Result<StoredNostrEvent, NostrEventStoreError> {
743        self.validate_event_shape(decode_stored_event_msgpack(data)?)
744    }
745
746    pub fn decode_verified_event(
747        &self,
748        data: &[u8],
749    ) -> Result<VerifiedStoredNostrEvent, NostrEventStoreError> {
750        VerifiedStoredNostrEvent::try_from(self.decode_event(data)?)
751    }
752
753    pub async fn validate_index_root(
754        &self,
755        root: Option<&Cid>,
756    ) -> Result<(), NostrEventStoreError> {
757        let Some(root) = root else {
758            return Ok(());
759        };
760
761        let manifest = self.get_manifest(Some(root)).await?;
762        let mut missing = Vec::new();
763        if manifest.by_id.is_none() {
764            missing.push("by-id");
765        }
766        if manifest.by_author_time.is_none() {
767            missing.push(MANIFEST_BY_AUTHOR_TIME);
768        }
769        if manifest.by_author_kind_time.is_none() {
770            missing.push(MANIFEST_BY_AUTHOR_KIND_TIME);
771        }
772        if manifest.by_kind_time.is_none() {
773            missing.push(MANIFEST_BY_KIND_TIME);
774        }
775        if manifest.by_time.is_none() {
776            missing.push(MANIFEST_BY_TIME);
777        }
778        if !missing.is_empty() {
779            return Err(NostrEventStoreError::Validation(format!(
780                "nostr event index root missing required manifest entries: {}",
781                missing.join(", ")
782            )));
783        }
784
785        if let Some(metadata) =
786            load_collection_manifest_metadata(Arc::clone(&self.store), Some(root)).await?
787        {
788            let schema = metadata.published_schema();
789            if let Some(item_format) = schema.and_then(|schema| schema.item_format()) {
790                if item_format != NOSTR_EVENT_ITEM_FORMAT {
791                    return Err(NostrEventStoreError::Validation(format!(
792                        "nostr event index item format mismatch: expected {NOSTR_EVENT_ITEM_FORMAT}, got {item_format}"
793                    )));
794                }
795            }
796            if let Some(projection_format) = schema.and_then(|schema| schema.projection_format()) {
797                if projection_format != NOSTR_EVENT_PROJECTION_FORMAT {
798                    return Err(NostrEventStoreError::Validation(format!(
799                        "nostr event index projection format mismatch: expected {NOSTR_EVENT_PROJECTION_FORMAT}, got {projection_format}"
800                    )));
801                }
802            }
803        }
804
805        Ok(())
806    }
807
808    pub async fn add(
809        &self,
810        root: Option<&Cid>,
811        event: StoredNostrEvent,
812    ) -> Result<Cid, NostrEventStoreError> {
813        let buffered_store = Arc::new(BufferedStore::new_optimistic(Arc::clone(&self.store)));
814        let buffered_writer =
815            NostrEventStore::with_options(Arc::clone(&buffered_store), self.options.clone());
816        let normalized = {
817            let _profile = ProfileGuard::new("nostr.add.validate_event");
818            buffered_writer.validate_event(event).await?
819        };
820        let mut manifest = {
821            let _profile = ProfileGuard::new("nostr.add.get_manifest");
822            buffered_writer.get_manifest(root).await?
823        };
824        let mut obsolete_event_cids = Vec::new();
825        buffered_writer
826            .insert_into_manifest(&mut manifest, normalized, &mut obsolete_event_cids)
827            .await?;
828
829        let Some(root) = ({
830            let _profile = ProfileGuard::new("nostr.add.write_manifest");
831            buffered_writer.write_manifest(&manifest).await?
832        }) else {
833            return Err(NostrEventStoreError::Validation(
834                "failed to write event manifest".to_string(),
835            ));
836        };
837        {
838            let _profile = ProfileGuard::new("nostr.add.flush");
839            buffered_store.flush().await.map_err(|err| {
840                NostrEventStoreError::HashTree(HashTreeError::Store(err.to_string()))
841            })?;
842        }
843        self.delete_obsolete_event_blobs(&obsolete_event_cids)
844            .await?;
845        Ok(root)
846    }
847
848    pub async fn build<I>(
849        &self,
850        root: Option<&Cid>,
851        events: I,
852    ) -> Result<Option<Cid>, NostrEventStoreError>
853    where
854        I: IntoIterator<Item = StoredNostrEvent>,
855    {
856        let mut events: Vec<StoredNostrEvent> = events.into_iter().collect();
857        if events.is_empty() {
858            return Ok(root.cloned());
859        }
860        events = retain_latest_replaceable_events(events);
861        events.sort_by(|left, right| match compare_events(left, right) {
862            x if x < 0 => std::cmp::Ordering::Less,
863            x if x > 0 => std::cmp::Ordering::Greater,
864            _ => std::cmp::Ordering::Equal,
865        });
866
867        let buffered_store = Arc::new(BufferedStore::new_optimistic(Arc::clone(&self.store)));
868        let buffered_writer =
869            NostrEventStore::with_options(Arc::clone(&buffered_store), self.options.clone());
870        let mut obsolete_event_cids = Vec::new();
871        let next_root = if root.is_none() {
872            buffered_writer.build_manifest_from_events(events).await?
873        } else {
874            let mut manifest = buffered_writer.get_manifest(root).await?;
875            let mut prepared_non_replaceable = Vec::new();
876            for (sequence, event) in events.into_iter().enumerate() {
877                let normalized = buffered_writer.validate_event(event).await?;
878                if is_replaceable_kind(normalized.kind)
879                    || is_parameterized_replaceable_kind(normalized.kind)
880                {
881                    buffered_writer
882                        .insert_into_manifest(&mut manifest, normalized, &mut obsolete_event_cids)
883                        .await?;
884                } else {
885                    let previous = match buffered_writer
886                        .manifest_event_cid(&manifest, &normalized.id)
887                        .await?
888                    {
889                        Some(existing_cid) => {
890                            match buffered_writer.read_stored_event(&existing_cid).await {
891                                Ok(_) => continue,
892                                Err(err) if is_missing_stored_event_error(&err) => {
893                                    Some(normalized.clone())
894                                }
895                                Err(err) => return Err(err),
896                            }
897                        }
898                        None => None,
899                    };
900                    prepared_non_replaceable
901                        .push(buffered_writer.prepare_event_blob(sequence, normalized, previous)?);
902                }
903            }
904
905            let indexed_events = buffered_writer
906                .put_prepared_event_blobs_parallel(prepared_non_replaceable)
907                .await?;
908            if !indexed_events.is_empty() {
909                let mut collection = buffered_writer.collection_writer_from_manifest(&manifest);
910                for (_sequence, event, event_cid, previous) in indexed_events {
911                    collection
912                        .put(&event, &event_cid, previous.as_ref())
913                        .await?;
914                }
915                manifest = nostr_manifest_from_collection_state(collection.state());
916            }
917            buffered_writer.write_manifest(&manifest).await?
918        };
919        buffered_store
920            .flush()
921            .await
922            .map_err(|err| NostrEventStoreError::HashTree(HashTreeError::Store(err.to_string())))?;
923        self.delete_obsolete_event_blobs(&obsolete_event_cids)
924            .await?;
925        Ok(next_root)
926    }
927
928    pub async fn upgrade_manifest_indexes(
929        &self,
930        root: Option<&Cid>,
931    ) -> Result<Option<Cid>, NostrEventStoreError> {
932        let Some(root) = root else {
933            return Ok(None);
934        };
935
936        let manifest = self.get_manifest(Some(root)).await?;
937        let buffered_store = Arc::new(BufferedStore::new_optimistic(Arc::clone(&self.store)));
938        let buffered_writer =
939            NostrEventStore::with_options(Arc::clone(&buffered_store), self.options.clone());
940
941        let Some(next_manifest) = buffered_writer
942            .upgrade_manifest_with_missing_indexes(&manifest)
943            .await?
944        else {
945            return Ok(Some(root.clone()));
946        };
947
948        let next_root = buffered_writer.write_manifest(&next_manifest).await?;
949        buffered_store
950            .flush()
951            .await
952            .map_err(|err| NostrEventStoreError::HashTree(HashTreeError::Store(err.to_string())))?;
953        Ok(next_root)
954    }
955
956    pub async fn get_by_id(
957        &self,
958        root: Option<&Cid>,
959        event_id: &str,
960    ) -> Result<Option<StoredNostrEvent>, NostrEventStoreError> {
961        validate_lower_hex(event_id, 64, "event id")?;
962        let manifest = self.get_manifest(root).await?;
963        let source = self.collection_source_from_manifest(&manifest);
964        let Some(event_cid) = source.get(event_id).await? else {
965            return Ok(None);
966        };
967        match self.read_stored_event(&event_cid).await {
968            Ok(event) => Ok(Some(event)),
969            Err(err) if is_missing_stored_event_error(&err) => Ok(None),
970            Err(err) => Err(err),
971        }
972    }
973
974    pub async fn get_verified_by_id(
975        &self,
976        root: Option<&Cid>,
977        event_id: &str,
978    ) -> Result<Option<VerifiedStoredNostrEvent>, NostrEventStoreError> {
979        self.get_by_id(root, event_id)
980            .await?
981            .map(VerifiedStoredNostrEvent::try_from)
982            .transpose()
983    }
984
985    pub async fn list_by_author(
986        &self,
987        root: Option<&Cid>,
988        pubkey: &str,
989        options: ListEventsOptions,
990    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
991        let manifest = self.get_manifest(root).await?;
992        let Some(by_author_time) = manifest.by_author_time.as_ref() else {
993            return Ok(Vec::new());
994        };
995        self.collect_events(
996            by_author_time,
997            &format!("{}:", validate_lower_hex(pubkey, 64, "pubkey")?),
998            &options,
999        )
1000        .await
1001    }
1002
1003    pub async fn list_by_author_lossy(
1004        &self,
1005        root: Option<&Cid>,
1006        pubkey: &str,
1007        options: ListEventsOptions,
1008    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1009        let manifest = self.get_manifest(root).await?;
1010        let Some(by_author_time) = manifest.by_author_time.as_ref() else {
1011            return Ok(Vec::new());
1012        };
1013        self.collect_events_lossy(
1014            by_author_time,
1015            &format!("{}:", validate_lower_hex(pubkey, 64, "pubkey")?),
1016            &options,
1017        )
1018        .await
1019    }
1020
1021    pub async fn list_by_author_and_kind(
1022        &self,
1023        root: Option<&Cid>,
1024        pubkey: &str,
1025        kind: u32,
1026        options: ListEventsOptions,
1027    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1028        let manifest = self.get_manifest(root).await?;
1029        let Some(by_author_kind_time) = manifest.by_author_kind_time.as_ref() else {
1030            return Ok(Vec::new());
1031        };
1032
1033        self.collect_events(
1034            by_author_kind_time,
1035            &format!(
1036                "{}:{}:",
1037                validate_lower_hex(pubkey, 64, "pubkey")?,
1038                pad_kind(kind)
1039            ),
1040            &options,
1041        )
1042        .await
1043    }
1044
1045    pub async fn list_by_kind(
1046        &self,
1047        root: Option<&Cid>,
1048        kind: u32,
1049        options: ListEventsOptions,
1050    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1051        let manifest = self.get_manifest(root).await?;
1052        let Some(by_kind_time) = manifest.by_kind_time.as_ref() else {
1053            return Ok(Vec::new());
1054        };
1055
1056        self.collect_events(by_kind_time, &format!("{}:", pad_kind(kind)), &options)
1057            .await
1058    }
1059
1060    pub async fn list_by_kind_lossy(
1061        &self,
1062        root: Option<&Cid>,
1063        kind: u32,
1064        options: ListEventsOptions,
1065    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1066        let manifest = self.get_manifest(root).await?;
1067        let Some(by_kind_time) = manifest.by_kind_time.as_ref() else {
1068            return Ok(Vec::new());
1069        };
1070
1071        self.collect_events_lossy(by_kind_time, &format!("{}:", pad_kind(kind)), &options)
1072            .await
1073    }
1074
1075    pub async fn get_replaceable(
1076        &self,
1077        root: Option<&Cid>,
1078        pubkey: &str,
1079        kind: u32,
1080    ) -> Result<Option<StoredNostrEvent>, NostrEventStoreError> {
1081        let manifest = self.get_manifest(root).await?;
1082        let key = replaceable_key(&validate_lower_hex(pubkey, 64, "pubkey")?, kind);
1083        let source = self.collection_source_from_manifest(&manifest);
1084        let Some(cid) = source.get_index_link(MANIFEST_REPLACEABLE, &key).await? else {
1085            return Ok(None);
1086        };
1087        match self.read_stored_event(&cid).await {
1088            Ok(event) => Ok(Some(event)),
1089            Err(err) if is_missing_stored_event_error(&err) => Ok(None),
1090            Err(err) => Err(err),
1091        }
1092    }
1093
1094    pub async fn list_recent(
1095        &self,
1096        root: Option<&Cid>,
1097        options: ListEventsOptions,
1098    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1099        let manifest = self.get_manifest(root).await?;
1100        let Some(by_time) = manifest.by_time.as_ref() else {
1101            return Ok(Vec::new());
1102        };
1103        self.collect_events(by_time, "", &options).await
1104    }
1105
1106    pub async fn list_recent_lossy(
1107        &self,
1108        root: Option<&Cid>,
1109        options: ListEventsOptions,
1110    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1111        let manifest = self.get_manifest(root).await?;
1112        let Some(by_time) = manifest.by_time.as_ref() else {
1113            return Ok(Vec::new());
1114        };
1115        self.collect_events_lossy(by_time, "", &options).await
1116    }
1117
1118    pub async fn list_by_tag(
1119        &self,
1120        root: Option<&Cid>,
1121        tag_name: &str,
1122        tag_value: &str,
1123        options: ListEventsOptions,
1124    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1125        let manifest = self.get_manifest(root).await?;
1126        let Some(by_tag) = manifest.by_tag.as_ref() else {
1127            return Ok(Vec::new());
1128        };
1129        let prefix = tag_prefix(tag_name, tag_value)?;
1130        self.collect_events(by_tag, &prefix, &options).await
1131    }
1132
1133    pub async fn get_parameterized_replaceable(
1134        &self,
1135        root: Option<&Cid>,
1136        pubkey: &str,
1137        kind: u32,
1138        d_tag: &str,
1139    ) -> Result<Option<StoredNostrEvent>, NostrEventStoreError> {
1140        let manifest = self.get_manifest(root).await?;
1141        let key =
1142            parameterized_replaceable_key(&validate_lower_hex(pubkey, 64, "pubkey")?, kind, d_tag);
1143        let source = self.collection_source_from_manifest(&manifest);
1144        let Some(cid) = source
1145            .get_index_link(MANIFEST_PARAMETERIZED_REPLACEABLE, &key)
1146            .await?
1147        else {
1148            return Ok(None);
1149        };
1150        match self.read_stored_event(&cid).await {
1151            Ok(event) => Ok(Some(event)),
1152            Err(err) if is_missing_stored_event_error(&err) => Ok(None),
1153            Err(err) => Err(err),
1154        }
1155    }
1156
1157    pub async fn get_manifest(
1158        &self,
1159        root: Option<&Cid>,
1160    ) -> Result<NostrEventManifest, NostrEventStoreError> {
1161        let definition = nostr_collection_definition();
1162        let state = load_collection_state(Arc::clone(&self.store), &definition, root).await?;
1163        Ok(nostr_manifest_from_collection_state(&state))
1164    }
1165
1166    async fn collect_events(
1167        &self,
1168        root: &Cid,
1169        prefix: &str,
1170        options: &ListEventsOptions,
1171    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1172        let mut events = Vec::new();
1173        let entries = if prefix.is_empty() {
1174            match options.limit {
1175                Some(limit) => self.index.links_entries_limited(Some(root), limit).await?,
1176                None => self.index.links_entries(Some(root)).await?,
1177            }
1178        } else {
1179            match options.limit {
1180                Some(limit) => self.index.prefix_links_limited(root, prefix, limit).await?,
1181                None => self.index.prefix_links(root, prefix).await?,
1182            }
1183        };
1184        for (key, cid) in entries {
1185            let created_at = created_at_from_index_key(&key)?;
1186            if options.until.is_some_and(|until| created_at > until) {
1187                continue;
1188            }
1189            if options.since.is_some_and(|since| created_at < since) {
1190                break;
1191            }
1192            events.push(self.read_stored_event(&cid).await?);
1193            if options.limit.is_some_and(|limit| events.len() >= limit) {
1194                break;
1195            }
1196        }
1197        Ok(events)
1198    }
1199
1200    async fn collect_events_lossy(
1201        &self,
1202        root: &Cid,
1203        prefix: &str,
1204        options: &ListEventsOptions,
1205    ) -> Result<Vec<StoredNostrEvent>, NostrEventStoreError> {
1206        let mut events = Vec::new();
1207        let entries = if prefix.is_empty() {
1208            match options.limit {
1209                Some(limit) => self.index.links_entries_limited(Some(root), limit).await?,
1210                None => self.index.links_entries(Some(root)).await?,
1211            }
1212        } else {
1213            match options.limit {
1214                Some(limit) => self.index.prefix_links_limited(root, prefix, limit).await?,
1215                None => self.index.prefix_links(root, prefix).await?,
1216            }
1217        };
1218        for (key, cid) in entries {
1219            let created_at = created_at_from_index_key(&key)?;
1220            if options.until.is_some_and(|until| created_at > until) {
1221                continue;
1222            }
1223            if options.since.is_some_and(|since| created_at < since) {
1224                break;
1225            }
1226            match self.read_stored_event(&cid).await {
1227                Ok(event) => events.push(event),
1228                Err(NostrEventStoreError::Validation(message))
1229                    if message == MISSING_STORED_EVENT_BLOB_ERROR => {}
1230                Err(err) => return Err(err),
1231            }
1232            if options.limit.is_some_and(|limit| events.len() >= limit) {
1233                break;
1234            }
1235        }
1236        Ok(events)
1237    }
1238
1239    async fn read_stored_event(&self, cid: &Cid) -> Result<StoredNostrEvent, NostrEventStoreError> {
1240        let Some(data) = self.tree.get(cid, None).await? else {
1241            return Err(NostrEventStoreError::Validation(
1242                MISSING_STORED_EVENT_BLOB_ERROR.to_string(),
1243            ));
1244        };
1245        self.decode_event(&data)
1246    }
1247
1248    async fn manifest_event_cid(
1249        &self,
1250        manifest: &NostrEventManifest,
1251        id: &str,
1252    ) -> Result<Option<Cid>, NostrEventStoreError> {
1253        self.collection_source_from_manifest(manifest)
1254            .get(id)
1255            .await
1256            .map_err(Into::into)
1257    }
1258
1259    fn prepare_event_blob(
1260        &self,
1261        sequence: usize,
1262        event: StoredNostrEvent,
1263        previous: Option<StoredNostrEvent>,
1264    ) -> Result<PreparedEventBlob, NostrEventStoreError> {
1265        let bytes = self.encode_validated_event(&event)?;
1266        Ok(PreparedEventBlob {
1267            sequence,
1268            event,
1269            bytes,
1270            previous,
1271        })
1272    }
1273
1274    async fn put_prepared_event_blobs_parallel(
1275        &self,
1276        prepared_events: Vec<PreparedEventBlob>,
1277    ) -> Result<Vec<(usize, StoredNostrEvent, Cid, Option<StoredNostrEvent>)>, NostrEventStoreError>
1278    {
1279        let tree = &self.tree;
1280        let mut indexed_events =
1281            stream::iter(prepared_events.into_iter().map(|prepared| async move {
1282                let (event_cid, _size) = tree.put_file(&prepared.bytes).await?;
1283                Ok::<_, NostrEventStoreError>((
1284                    prepared.sequence,
1285                    prepared.event,
1286                    event_cid,
1287                    prepared.previous,
1288                ))
1289            }))
1290            .buffer_unordered(EVENT_BLOB_WRITE_CONCURRENCY)
1291            .try_collect::<Vec<_>>()
1292            .await?;
1293
1294        indexed_events.sort_by_key(|(sequence, _, _, _)| *sequence);
1295        Ok(indexed_events)
1296    }
1297
1298    async fn insert_into_manifest(
1299        &self,
1300        manifest: &mut NostrEventManifest,
1301        normalized: StoredNostrEvent,
1302        obsolete_event_cids: &mut Vec<Cid>,
1303    ) -> Result<(), NostrEventStoreError> {
1304        let replaceable = self
1305            .resolve_replaceable_decision(manifest, &normalized)
1306            .await?;
1307        let (_replaceable_slot, replaced_existing) = match replaceable {
1308            ReplaceableDecision::Reject => return Ok(()),
1309            ReplaceableDecision::Accept { replaced, slot } => (slot, replaced),
1310        };
1311
1312        let event_bytes = {
1313            let _profile = ProfileGuard::new("nostr.add.encode_event");
1314            self.encode_validated_event(&normalized)?
1315        };
1316        let (event_cid, _size) = {
1317            let _profile = ProfileGuard::new("nostr.add.put_file");
1318            self.tree.put_file(&event_bytes).await?
1319        };
1320
1321        let mut collection = self.collection_writer_from_manifest(manifest);
1322        {
1323            let _profile = ProfileGuard::new("nostr.add.index.collection");
1324            collection
1325                .put(
1326                    &normalized,
1327                    &event_cid,
1328                    replaced_existing.as_ref().map(|existing| &existing.event),
1329                )
1330                .await?;
1331        }
1332        *manifest = nostr_manifest_from_collection_state(collection.state());
1333
1334        if let Some(existing) = replaced_existing.as_ref() {
1335            obsolete_event_cids.push(existing.cid.clone());
1336        }
1337
1338        Ok(())
1339    }
1340
1341    async fn build_manifest_from_events(
1342        &self,
1343        events: Vec<StoredNostrEvent>,
1344    ) -> Result<Option<Cid>, NostrEventStoreError> {
1345        let mut prepared_events = Vec::with_capacity(events.len());
1346
1347        for (sequence, event) in events.into_iter().enumerate() {
1348            let normalized = self.validate_event(event).await?;
1349            prepared_events.push(self.prepare_event_blob(sequence, normalized, None)?);
1350        }
1351        let indexed_events = self
1352            .put_prepared_event_blobs_parallel(prepared_events)
1353            .await?
1354            .into_iter()
1355            .map(|(_sequence, event, event_cid, _previous)| (event, event_cid))
1356            .collect::<Vec<_>>();
1357
1358        let mut collection = CollectionWriter::with_options(
1359            Arc::clone(&self.store),
1360            nostr_collection_definition(),
1361            nostr_collection_options(&self.options),
1362        );
1363        collection.rebuild(indexed_events).await?;
1364        collection.write_root().await.map_err(Into::into)
1365    }
1366
1367    async fn upgrade_manifest_with_missing_indexes(
1368        &self,
1369        manifest: &NostrEventManifest,
1370    ) -> Result<Option<NostrEventManifest>, NostrEventStoreError> {
1371        let mut next_manifest = manifest.clone();
1372        let mut changed = false;
1373
1374        if next_manifest.by_kind_time_author.is_none() {
1375            if let Some(by_author_kind_time_root) = manifest.by_author_kind_time.as_ref() {
1376                let entries = self
1377                    .index
1378                    .links_entries(Some(by_author_kind_time_root))
1379                    .await?;
1380                let mut derived = BTreeMap::new();
1381                for (key, cid) in entries {
1382                    derived.insert(kind_time_author_key_from_author_kind_time_key(&key)?, cid);
1383                }
1384                next_manifest.by_kind_time_author = self.index.build_links(derived).await?;
1385                changed = true;
1386            }
1387        }
1388
1389        if changed {
1390            Ok(Some(next_manifest))
1391        } else {
1392            Ok(None)
1393        }
1394    }
1395
1396    async fn resolve_replaceable_decision(
1397        &self,
1398        manifest: &NostrEventManifest,
1399        event: &StoredNostrEvent,
1400    ) -> Result<ReplaceableDecision, NostrEventStoreError> {
1401        let slot = if is_replaceable_kind(event.kind) {
1402            Some((
1403                ReplaceableSlot::Replaceable,
1404                replaceable_key(&event.pubkey, event.kind),
1405            ))
1406        } else if is_parameterized_replaceable_kind(event.kind) {
1407            Some((
1408                ReplaceableSlot::Parameterized,
1409                parameterized_replaceable_key(
1410                    &event.pubkey,
1411                    event.kind,
1412                    &parameterized_replaceable_d_tag(event),
1413                ),
1414            ))
1415        } else {
1416            None
1417        };
1418
1419        let Some((slot_kind, key)) = slot else {
1420            return Ok(ReplaceableDecision::Accept {
1421                replaced: None,
1422                slot: None,
1423            });
1424        };
1425
1426        let source = self.collection_source_from_manifest(manifest);
1427        let existing_cid = match slot_kind {
1428            ReplaceableSlot::Replaceable => {
1429                source.get_index_link(MANIFEST_REPLACEABLE, &key).await?
1430            }
1431            ReplaceableSlot::Parameterized => {
1432                source
1433                    .get_index_link(MANIFEST_PARAMETERIZED_REPLACEABLE, &key)
1434                    .await?
1435            }
1436        };
1437
1438        let Some(existing_cid) = existing_cid else {
1439            return Ok(ReplaceableDecision::Accept {
1440                replaced: None,
1441                slot: Some((slot_kind, key)),
1442            });
1443        };
1444
1445        let existing = match self.read_stored_event(&existing_cid).await {
1446            Ok(existing) => existing,
1447            Err(err) if is_missing_stored_event_error(&err) => {
1448                return Ok(ReplaceableDecision::Accept {
1449                    replaced: None,
1450                    slot: Some((slot_kind, key)),
1451                });
1452            }
1453            Err(err) => return Err(err),
1454        };
1455        if compare_events(event, &existing) > 0 {
1456            return Ok(ReplaceableDecision::Accept {
1457                replaced: Some(ExistingReplaceableEvent {
1458                    event: existing,
1459                    cid: existing_cid,
1460                }),
1461                slot: Some((slot_kind, key)),
1462            });
1463        }
1464
1465        Ok(ReplaceableDecision::Reject)
1466    }
1467
1468    async fn delete_obsolete_event_blobs(
1469        &self,
1470        obsolete_event_cids: &[Cid],
1471    ) -> Result<(), NostrEventStoreError> {
1472        for cid in obsolete_event_cids {
1473            self.store.delete(&cid.hash).await.map_err(|err| {
1474                NostrEventStoreError::HashTree(HashTreeError::Store(err.to_string()))
1475            })?;
1476        }
1477        Ok(())
1478    }
1479
1480    async fn write_manifest(
1481        &self,
1482        manifest: &NostrEventManifest,
1483    ) -> Result<Option<Cid>, NostrEventStoreError> {
1484        let collection = self.collection_writer_from_manifest(manifest);
1485        Ok(collection.write_root().await?)
1486    }
1487
1488    fn collection_source_from_manifest(
1489        &self,
1490        manifest: &NostrEventManifest,
1491    ) -> CollectionSource<S> {
1492        CollectionSource::new(
1493            Arc::clone(&self.store),
1494            collection_state_from_nostr_manifest(manifest),
1495        )
1496    }
1497
1498    fn collection_writer_from_manifest(
1499        &self,
1500        manifest: &NostrEventManifest,
1501    ) -> CollectionWriter<S, StoredNostrEvent> {
1502        CollectionWriter::with_state_and_options(
1503            Arc::clone(&self.store),
1504            nostr_collection_definition(),
1505            collection_state_from_nostr_manifest(manifest),
1506            nostr_collection_options(&self.options),
1507        )
1508    }
1509
1510    async fn validate_event(
1511        &self,
1512        event: StoredNostrEvent,
1513    ) -> Result<StoredNostrEvent, NostrEventStoreError> {
1514        let normalized = self.validate_event_shape(event)?;
1515        let payload = serde_json::to_string(&(
1516            0u8,
1517            normalized.pubkey.clone(),
1518            normalized.created_at,
1519            normalized.kind,
1520            normalized.tags.clone(),
1521            normalized.content.clone(),
1522        ))?;
1523        let computed = hex::encode(sha256(payload.as_bytes()));
1524        if computed != normalized.id {
1525            return Err(NostrEventStoreError::Validation(
1526                "event id does not match canonical nostr payload".to_string(),
1527            ));
1528        }
1529        Ok(normalized)
1530    }
1531
1532    fn encode_validated_event(
1533        &self,
1534        event: &StoredNostrEvent,
1535    ) -> Result<Vec<u8>, NostrEventStoreError> {
1536        encode_stored_event_msgpack(event)
1537    }
1538
1539    fn validate_event_shape(
1540        &self,
1541        event: StoredNostrEvent,
1542    ) -> Result<StoredNostrEvent, NostrEventStoreError> {
1543        normalize_signed_event(event)
1544    }
1545}
1546
1547fn normalize_signed_event(
1548    event: StoredNostrEvent,
1549) -> Result<StoredNostrEvent, NostrEventStoreError> {
1550    Ok(StoredNostrEvent {
1551        id: validate_lower_hex(&event.id, 64, "event id")?,
1552        pubkey: validate_lower_hex(&event.pubkey, 64, "pubkey")?,
1553        created_at: event.created_at,
1554        kind: event.kind,
1555        tags: event.tags,
1556        content: event.content,
1557        sig: validate_lower_hex(&event.sig, 128, "signature")?,
1558    })
1559}
1560
1561fn has_label(event: &StoredNostrEvent, label: &str) -> bool {
1562    event.tags.iter().any(|tag| {
1563        tag.first().map(String::as_str) == Some("l")
1564            && tag.get(1).map(String::as_str) == Some(label)
1565    })
1566}
1567
1568fn has_any_label(event: &StoredNostrEvent) -> bool {
1569    event
1570        .tags
1571        .iter()
1572        .any(|tag| tag.first().map(String::as_str) == Some("l"))
1573}
1574
1575fn unique_labels(labels: Vec<String>) -> Vec<String> {
1576    let mut seen = std::collections::BTreeSet::new();
1577    let mut result = Vec::new();
1578    for label in labels {
1579        if label.is_empty() || !seen.insert(label.clone()) {
1580            continue;
1581        }
1582        result.push(label);
1583    }
1584    result
1585}
1586
1587fn validate_lower_hex(
1588    value: &str,
1589    expected_len: usize,
1590    label: &str,
1591) -> Result<String, NostrEventStoreError> {
1592    let valid = value.len() == expected_len
1593        && value
1594            .bytes()
1595            .all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte));
1596    if valid {
1597        Ok(value.to_string())
1598    } else {
1599        Err(NostrEventStoreError::Validation(format!(
1600            "{label} must be a lowercase {expected_len}-character hex string"
1601        )))
1602    }
1603}
1604
1605fn pad_kind(kind: u32) -> String {
1606    format!("{kind:08x}")
1607}
1608
1609fn reverse_timestamp(created_at: u64) -> String {
1610    format!("{:016x}", u64::MAX - created_at)
1611}
1612
1613fn author_time_key(event: &StoredNostrEvent) -> String {
1614    format!(
1615        "{}:{}:{}",
1616        event.pubkey,
1617        reverse_timestamp(event.created_at),
1618        event.id
1619    )
1620}
1621
1622fn author_kind_time_key(event: &StoredNostrEvent) -> String {
1623    format!(
1624        "{}:{}:{}:{}",
1625        event.pubkey,
1626        pad_kind(event.kind),
1627        reverse_timestamp(event.created_at),
1628        event.id
1629    )
1630}
1631
1632fn kind_time_key(event: &StoredNostrEvent) -> String {
1633    format!(
1634        "{}:{}:{}",
1635        pad_kind(event.kind),
1636        reverse_timestamp(event.created_at),
1637        event.id
1638    )
1639}
1640
1641fn kind_time_author_key(event: &StoredNostrEvent) -> String {
1642    format!(
1643        "{}:{}:{}:{}",
1644        pad_kind(event.kind),
1645        reverse_timestamp(event.created_at),
1646        event.pubkey,
1647        event.id
1648    )
1649}
1650
1651fn kind_time_author_key_from_author_kind_time_key(
1652    key: &str,
1653) -> Result<String, NostrEventStoreError> {
1654    let mut parts = key.split(':');
1655    let Some(pubkey) = parts.next() else {
1656        return Err(NostrEventStoreError::Validation(format!(
1657            "invalid author-kind-time key: {key}"
1658        )));
1659    };
1660    let Some(kind) = parts.next() else {
1661        return Err(NostrEventStoreError::Validation(format!(
1662            "invalid author-kind-time key: {key}"
1663        )));
1664    };
1665    let Some(reversed_timestamp) = parts.next() else {
1666        return Err(NostrEventStoreError::Validation(format!(
1667            "invalid author-kind-time key: {key}"
1668        )));
1669    };
1670    let Some(event_id) = parts.next() else {
1671        return Err(NostrEventStoreError::Validation(format!(
1672            "invalid author-kind-time key: {key}"
1673        )));
1674    };
1675    if parts.next().is_some() {
1676        return Err(NostrEventStoreError::Validation(format!(
1677            "invalid author-kind-time key: {key}"
1678        )));
1679    }
1680
1681    Ok(format!(
1682        "{}:{}:{}:{}",
1683        kind, reversed_timestamp, pubkey, event_id
1684    ))
1685}
1686
1687fn time_key(event: &StoredNostrEvent) -> String {
1688    format!("{}:{}", reverse_timestamp(event.created_at), event.id)
1689}
1690
1691fn created_at_from_index_key(key: &str) -> Result<u64, NostrEventStoreError> {
1692    let mut parts = key.rsplitn(3, ':');
1693    let _event_id = parts.next();
1694    let Some(reversed) = parts.next() else {
1695        return Err(NostrEventStoreError::Validation(format!(
1696            "invalid nostr index key: {key}"
1697        )));
1698    };
1699    let reversed = u64::from_str_radix(reversed, 16).map_err(|err| {
1700        NostrEventStoreError::Validation(format!(
1701            "invalid reversed timestamp in nostr index key {key}: {err}"
1702        ))
1703    })?;
1704    Ok(u64::MAX - reversed)
1705}
1706
1707fn tag_keys(event: &StoredNostrEvent) -> Vec<String> {
1708    event
1709        .tags
1710        .iter()
1711        .filter_map(|tag| match tag.as_slice() {
1712            [name, value, ..] if !name.is_empty() && !value.is_empty() => {
1713                let normalized_name = name.to_lowercase();
1714                let normalized_value = normalize_tag_value(&normalized_name, value);
1715                Some(format!(
1716                    "{}:{}:{}:{}",
1717                    normalized_name,
1718                    normalized_value,
1719                    reverse_timestamp(event.created_at),
1720                    event.id
1721                ))
1722            }
1723            _ => None,
1724        })
1725        .collect()
1726}
1727
1728fn tag_prefix(tag_name: &str, tag_value: &str) -> Result<String, NostrEventStoreError> {
1729    let normalized_name = normalize_tag_name(tag_name)?;
1730    let normalized_value = normalize_tag_value(&normalized_name, tag_value);
1731    Ok(format!("{normalized_name}:{normalized_value}:"))
1732}
1733
1734fn normalize_tag_name(tag_name: &str) -> Result<String, NostrEventStoreError> {
1735    if tag_name.is_empty() {
1736        return Err(NostrEventStoreError::Validation(
1737            "tag name must be non-empty".to_string(),
1738        ));
1739    }
1740    Ok(tag_name.to_lowercase())
1741}
1742
1743fn normalize_tag_value(tag_name: &str, tag_value: &str) -> String {
1744    if tag_name == "t" {
1745        tag_value.to_lowercase()
1746    } else {
1747        tag_value.to_string()
1748    }
1749}
1750
1751fn replaceable_key(pubkey: &str, kind: u32) -> String {
1752    format!("{}:{}", pubkey, pad_kind(kind))
1753}
1754
1755fn parameterized_replaceable_key(pubkey: &str, kind: u32, d_tag: &str) -> String {
1756    format!("{}:{}:{}", pubkey, pad_kind(kind), d_tag)
1757}
1758
1759pub fn is_replaceable_kind(kind: u32) -> bool {
1760    kind == 0 || kind == 3 || kind == 41 || (10_000..20_000).contains(&kind)
1761}
1762
1763pub fn is_parameterized_replaceable_kind(kind: u32) -> bool {
1764    (30_000..40_000).contains(&kind)
1765}
1766
1767fn get_d_tag(event: &StoredNostrEvent) -> Option<String> {
1768    event.tags.iter().find_map(|tag| match tag.as_slice() {
1769        [name, value, ..] if name == "d" && !value.is_empty() => Some(value.clone()),
1770        _ => None,
1771    })
1772}
1773
1774fn parameterized_replaceable_d_tag(event: &StoredNostrEvent) -> String {
1775    get_d_tag(event).unwrap_or_default()
1776}
1777
1778fn compare_events(left: &StoredNostrEvent, right: &StoredNostrEvent) -> i8 {
1779    match left.created_at.cmp(&right.created_at) {
1780        std::cmp::Ordering::Less => -1,
1781        std::cmp::Ordering::Greater => 1,
1782        std::cmp::Ordering::Equal => match left.id.cmp(&right.id) {
1783            std::cmp::Ordering::Less => -1,
1784            std::cmp::Ordering::Greater => 1,
1785            std::cmp::Ordering::Equal => 0,
1786        },
1787    }
1788}
1789
1790fn retain_latest_replaceable_events(events: Vec<StoredNostrEvent>) -> Vec<StoredNostrEvent> {
1791    let mut winners = BTreeMap::<(ReplaceableSlot, String), StoredNostrEvent>::new();
1792    let mut plain = Vec::new();
1793
1794    for event in events {
1795        let slot = if is_replaceable_kind(event.kind) {
1796            Some((
1797                ReplaceableSlot::Replaceable,
1798                replaceable_key(&event.pubkey, event.kind),
1799            ))
1800        } else if is_parameterized_replaceable_kind(event.kind) {
1801            Some((
1802                ReplaceableSlot::Parameterized,
1803                parameterized_replaceable_key(
1804                    &event.pubkey,
1805                    event.kind,
1806                    &parameterized_replaceable_d_tag(&event),
1807                ),
1808            ))
1809        } else {
1810            None
1811        };
1812
1813        if let Some(slot) = slot {
1814            match winners.get(&slot) {
1815                Some(current) if compare_events(&event, current) <= 0 => {}
1816                _ => {
1817                    winners.insert(slot, event);
1818                }
1819            }
1820        } else {
1821            plain.push(event);
1822        }
1823    }
1824
1825    plain.extend(winners.into_values());
1826    plain
1827}