hashtree-cli 0.2.54

Hashtree daemon and CLI - content-addressed storage with P2P sync
Documentation
use super::auth::AppState;
use nostr::{Event, EventId, Filter as NostrFilter};
use nostr_sdk::{ClientBuilder, EventSource};
use std::collections::HashSet;
use std::time::Duration;

const LOCAL_REQUEST_NOSTR_QUERY_TIMEOUT: Duration = Duration::from_secs(4);

pub(super) struct LocalRequestNostrQuery {
    pub(super) local_events: Vec<Event>,
    pub(super) upstream_events: Vec<Event>,
}

impl LocalRequestNostrQuery {
    pub(super) fn merged_events(&self, limit: usize) -> Vec<Event> {
        let mut seen = HashSet::<EventId>::new();
        let mut merged = Vec::new();

        for event in self.local_events.iter().chain(self.upstream_events.iter()) {
            if seen.insert(event.id) {
                merged.push(event.clone());
            }
        }

        merged.sort_by(|left, right| {
            right
                .created_at
                .cmp(&left.created_at)
                .then_with(|| right.id.cmp(&left.id))
        });
        merged.truncate(limit);
        merged
    }
}

fn normalized_limit(filter: &NostrFilter, limit: usize) -> usize {
    match filter.limit {
        Some(filter_limit) => filter_limit.min(limit),
        None => limit,
    }
}

fn is_locally_answerable_id_query(filter: &NostrFilter) -> bool {
    filter.ids.as_ref().is_some_and(|ids| !ids.is_empty())
        && filter.authors.is_none()
        && filter.kinds.is_none()
        && filter.search.is_none()
        && filter.since.is_none()
        && filter.until.is_none()
        && filter.generic_tags.is_empty()
}

fn normalized_relay_urls(relays: &[String]) -> Vec<String> {
    let mut seen = HashSet::new();
    let mut output = Vec::new();

    for relay in relays {
        let normalized = relay.trim();
        if normalized.is_empty() || !seen.insert(normalized.to_string()) {
            continue;
        }
        output.push(normalized.to_string());
    }

    output
}

async fn fetch_upstream_events(relays: &[String], filter: &NostrFilter) -> Vec<Event> {
    if relays.is_empty() {
        return Vec::new();
    }

    let client = ClientBuilder::default().build();
    let mut connected_relays = 0usize;
    for relay in relays {
        if client.add_relay(relay).await.is_ok() {
            connected_relays += 1;
        }
    }
    if connected_relays == 0 {
        return Vec::new();
    }

    client.connect().await;
    let result = client
        .get_events_of(
            vec![filter.clone()],
            EventSource::relays(Some(LOCAL_REQUEST_NOSTR_QUERY_TIMEOUT)),
        )
        .await
        .unwrap_or_default();
    let _ = client.disconnect().await;
    result
}

pub(super) async fn query_events_for_local_request(
    state: &AppState,
    filter: &NostrFilter,
    limit: usize,
) -> LocalRequestNostrQuery {
    let limit = normalized_limit(filter, limit);
    if limit == 0 {
        return LocalRequestNostrQuery {
            local_events: Vec::new(),
            upstream_events: Vec::new(),
        };
    }

    let local_events = match state.nostr_relay.as_ref() {
        Some(relay) => relay.query_events(filter, limit).await,
        None => Vec::new(),
    };
    if is_locally_answerable_id_query(filter) && !local_events.is_empty() {
        return LocalRequestNostrQuery {
            local_events,
            upstream_events: Vec::new(),
        };
    }

    let mut upstream_filter = filter.clone();
    upstream_filter.limit = Some(limit);
    let upstream_events = fetch_upstream_events(
        &normalized_relay_urls(&state.nostr_relay_urls),
        &upstream_filter,
    )
    .await;

    if let Some(relay) = &state.nostr_relay {
        for event in &upstream_events {
            let _ = relay.ingest_trusted_event_silent(event.clone()).await;
        }
    }

    LocalRequestNostrQuery {
        local_events,
        upstream_events,
    }
}