Skip to main content

igc_net/
indexer.rs

1//! Gossip indexer: listen for flight announcements and fetch blobs.
2//!
3//! Reference indexer for the igc-net publish/announce flow.
4
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use futures::StreamExt;
10use iroh_gossip::TopicId;
11use iroh_gossip::api::Event;
12use tokio::sync::{Mutex, Semaphore};
13
14use crate::id::{Blake3Hex, NodeIdHex};
15use crate::metadata::FlightMetadata;
16use crate::node::IgcIrohNode;
17use crate::store::{FlatFileStore, IndexRecord, IndexRecordSource};
18use crate::topic::announce_topic_id;
19use crate::util::canonical_utc_now;
20
21// ── Error type ────────────────────────────────────────────────────────────────
22
23#[derive(Debug, thiserror::Error)]
24pub enum IndexerError {
25    #[error("gossip: {0}")]
26    Gossip(String),
27    #[error("store: {0}")]
28    Store(#[from] crate::store::StoreError),
29    #[error("failed to download blob: {0}")]
30    BlobDownload(String),
31    #[error("failed to read downloaded blob: {0}")]
32    BlobRead(String),
33}
34
35#[derive(Debug, thiserror::Error)]
36enum AnnouncementError {
37    #[error("JSON: {0}")]
38    Json(#[from] serde_json::Error),
39    #[error("announcement exceeds 1024-byte limit: {0} bytes")]
40    TooLarge(usize),
41    #[error("invalid {ticket} ticket: {message}")]
42    InvalidTicket {
43        ticket: &'static str,
44        message: String,
45    },
46    #[error("{ticket} ticket hash mismatch")]
47    TicketHashMismatch { ticket: &'static str },
48    #[error("{ticket} ticket node mismatch")]
49    TicketNodeMismatch { ticket: &'static str },
50    #[error("metadata JSON: {0}")]
51    MetadataJson(serde_json::Error),
52    #[error("metadata: {0}")]
53    Metadata(#[from] crate::metadata::MetadataError),
54    #[error("metadata igc_hash mismatch")]
55    MetadataIgcHashMismatch,
56    #[error("meta_hash mismatch")]
57    MetaHashMismatch,
58    #[error("igc_hash mismatch")]
59    IgcHashMismatch,
60}
61
62#[derive(Debug)]
63enum AnnouncementDisposition {
64    Ignored(String),
65    Indexed { fetched_igc: bool },
66}
67
68struct ValidatedAnnouncement {
69    ann: Announcement,
70    igc_ticket: iroh_blobs::ticket::BlobTicket,
71    meta_ticket: iroh_blobs::ticket::BlobTicket,
72}
73
74#[derive(Clone)]
75struct IndexerHandle {
76    endpoint: iroh::Endpoint,
77    fs_store: iroh_blobs::store::fs::FsStore,
78    store: Arc<FlatFileStore>,
79}
80
81impl IndexerHandle {
82    fn store(&self) -> &FlatFileStore {
83        self.store.as_ref()
84    }
85}
86
87// ── FetchPolicy ───────────────────────────────────────────────────────────────
88
89/// Determines what the indexer stores after receiving an announcement.
90#[derive(Debug, Clone)]
91pub enum FetchPolicy {
92    /// Store only the metadata JSON blob; fetch the raw IGC on explicit request.
93    MetadataOnly,
94    /// Fetch and store every announced raw IGC blob.
95    Eager,
96    /// Fetch only flights whose bbox overlaps this geographic region.
97    GeoFiltered {
98        min_lat: f64,
99        max_lat: f64,
100        min_lon: f64,
101        max_lon: f64,
102    },
103}
104
105// ── RateLimitConfig ───────────────────────────────────────────────────────────
106
107/// Per-source flood protection for inbound gossip indexing.
108///
109/// Limits apply per unknown `publisher_node_id` on rolling windows.
110/// Trusted nodes (listed in `trusted_node_ids`) bypass all limits.
111#[derive(Debug, Clone)]
112pub struct RateLimitConfig {
113    /// Maximum number of announcements accepted per publisher per rolling hour.
114    pub blobs_per_hour: u32,
115    /// Maximum total megabytes accepted per publisher per rolling 24 hours.
116    pub mb_per_day: f64,
117    /// Node IDs exempt from all rate limits.
118    pub trusted_node_ids: HashSet<NodeIdHex>,
119}
120
121impl Default for RateLimitConfig {
122    fn default() -> Self {
123        Self {
124            blobs_per_hour: 100,
125            mb_per_day: 200.0,
126            trusted_node_ids: HashSet::new(),
127        }
128    }
129}
130
131// ── IndexerConfig ─────────────────────────────────────────────────────────────
132
133/// Configuration bundle for [`run_indexer`].
134#[derive(Debug, Clone)]
135pub struct IndexerConfig {
136    /// Blob fetch policy applied to each accepted announcement.
137    pub policy: FetchPolicy,
138    /// Known bootstrap peers (iroh public keys) to seed the gossip swarm.
139    ///
140    /// Empty in production (relay-based discovery); populate in integration
141    /// tests for loopback connections without relay infrastructure.
142    pub bootstrap: Vec<iroh::PublicKey>,
143    /// Optional per-source flood protection.
144    ///
145    /// `None` disables rate limiting (all announcements are accepted).
146    pub rate_limit: Option<RateLimitConfig>,
147}
148
149impl IndexerConfig {
150    /// Convenience constructor: policy + bootstrap, no rate limiting.
151    pub fn simple(policy: FetchPolicy, bootstrap: Vec<iroh::PublicKey>) -> Self {
152        Self {
153            policy,
154            bootstrap,
155            rate_limit: None,
156        }
157    }
158}
159
160// ── Internal rate-limit state ─────────────────────────────────────────────────
161
162struct PublisherStats {
163    blobs_this_hour: u32,
164    hour_window_start: Instant,
165    bytes_today: u64,
166    day_window_start: Instant,
167}
168
169impl Default for PublisherStats {
170    fn default() -> Self {
171        let now = Instant::now();
172        Self {
173            blobs_this_hour: 0,
174            hour_window_start: now,
175            bytes_today: 0,
176            day_window_start: now,
177        }
178    }
179}
180
181type RateLimitState = Arc<Mutex<HashMap<NodeIdHex, PublisherStats>>>;
182
183const DEFAULT_MAX_CONCURRENT_ANNOUNCEMENTS: usize = 64;
184
185// ── Wire format ───────────────────────────────────────────────────────────────
186
187#[derive(Debug, serde::Deserialize)]
188struct Announcement {
189    igc_hash: Blake3Hex,
190    meta_hash: Blake3Hex,
191    #[allow(dead_code)]
192    node_id: NodeIdHex,
193    igc_ticket: String,
194    meta_ticket: String,
195}
196
197// ── run_indexer() ─────────────────────────────────────────────────────────────
198
199/// Subscribe to the announce gossip topic and process incoming announcements.
200///
201/// Runs until the node shuts down or an unrecoverable gossip error occurs.
202/// Each announcement is processed in a spawned task so the gossip loop is
203/// never blocked by network fetches.
204///
205/// Pass [`IndexerConfig::simple`] for tests or production without rate limiting.
206/// Use a full [`IndexerConfig`] with `rate_limit` set to enable per-source
207/// flood protection in reference node deployments.
208pub async fn run_indexer(
209    node: &IgcIrohNode,
210    config: IndexerConfig,
211) -> Result<(), IndexerError> {
212    let topic = TopicId::from_bytes(announce_topic_id());
213    let handle = Arc::new(IndexerHandle {
214        endpoint: node.endpoint.clone(),
215        fs_store: node.fs_store.clone(),
216        store: Arc::clone(&node.store),
217    });
218
219    // When bootstrap peers are given (e.g., in integration tests), wait until
220    // at least one peer has joined before starting the event loop.  With an
221    // empty bootstrap list (production), return immediately and rely on peers
222    // discovering us via the relay.
223    let mut stream = if config.bootstrap.is_empty() {
224        node.gossip
225            .subscribe(topic, config.bootstrap)
226            .await
227            .map_err(|e| IndexerError::Gossip(e.to_string()))?
228    } else {
229        node.gossip
230            .subscribe_and_join(topic, config.bootstrap)
231            .await
232            .map_err(|e| IndexerError::Gossip(e.to_string()))?
233    };
234
235    tracing::info!("indexer started — listening for flight announcements");
236
237    let rl_state: Option<RateLimitState> = config
238        .rate_limit
239        .as_ref()
240        .map(|_| Arc::new(Mutex::new(HashMap::new())));
241    let permits = Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT_ANNOUNCEMENTS));
242
243    while let Some(item) = stream.next().await {
244        let event = match item {
245            Ok(e) => e,
246            Err(e) => {
247                tracing::warn!("gossip stream error: {e}");
248                return Err(IndexerError::Gossip(e.to_string()));
249            }
250        };
251
252        if let Event::Received(msg) = event {
253            let payload = msg.content.clone();
254            let handle = Arc::clone(&handle);
255            let policy = config.policy.clone();
256            let rl_cfg = config.rate_limit.clone();
257            let rl_state = rl_state.clone();
258            let permit = Arc::clone(&permits)
259                .acquire_owned()
260                .await
261                .map_err(|_| IndexerError::Gossip("announcement semaphore closed".to_string()))?;
262            tokio::spawn(async move {
263                let _permit = permit;
264                match handle_announcement(&handle, &payload, &policy, rl_cfg.as_ref(), rl_state)
265                    .await
266                {
267                    Ok(AnnouncementDisposition::Ignored(reason)) => {
268                        tracing::debug!(%reason, "announcement ignored");
269                    }
270                    Ok(AnnouncementDisposition::Indexed { fetched_igc }) => {
271                        tracing::debug!(fetched_igc, "announcement indexed");
272                    }
273                    Err(e) => {
274                        tracing::warn!("announcement handling failed: {e}");
275                    }
276                }
277            });
278        }
279    }
280
281    Ok(())
282}
283
284// ── Internal announcement handling ───────────────────────────────────────────
285
286async fn handle_announcement(
287    node: &IndexerHandle,
288    payload: &[u8],
289    policy: &FetchPolicy,
290    rl_cfg: Option<&RateLimitConfig>,
291    rl_state: Option<RateLimitState>,
292) -> Result<AnnouncementDisposition, IndexerError> {
293    if let Err(e) = validate_payload_size(payload) {
294        return Ok(AnnouncementDisposition::Ignored(
295            e.to_string(),
296        ));
297    }
298
299    // ── 1. Parse JSON ─────────────────────────────────────────────────────────
300    let ann: Announcement = match serde_json::from_slice(payload) {
301        Ok(a) => a,
302        Err(e) => {
303            return Ok(AnnouncementDisposition::Ignored(format!(
304                "malformed announcement: {e}"
305            )));
306        }
307    };
308
309    // ── 2. Validate announcement invariants ───────────────────────────────────
310    let ann = match validate_announcement(ann) {
311        Ok(ann) => ann,
312        Err(e) => return Ok(AnnouncementDisposition::Ignored(e.to_string())),
313    };
314
315    // ── 3. Rate limit ─────────────────────────────────────────────────────────
316    if let (Some(cfg), Some(state)) = (rl_cfg, rl_state.as_ref())
317        && !cfg.trusted_node_ids.contains(&ann.ann.node_id)
318    {
319        let mut map = state.lock().await;
320        let stats = map.entry(ann.ann.node_id.clone()).or_default();
321        let now = Instant::now();
322
323        // Slide the hour window
324        if now.duration_since(stats.hour_window_start) >= Duration::from_secs(3600) {
325            stats.blobs_this_hour = 0;
326            stats.hour_window_start = now;
327        }
328        // Slide the day window
329        if now.duration_since(stats.day_window_start) >= Duration::from_secs(86400) {
330            stats.bytes_today = 0;
331            stats.day_window_start = now;
332        }
333
334        if stats.blobs_this_hour >= cfg.blobs_per_hour {
335            tracing::debug!(
336                node_id = %ann.ann.node_id,
337                limit = cfg.blobs_per_hour,
338                "rate limit exceeded (blobs/hour) — dropping announcement"
339            );
340            return Ok(AnnouncementDisposition::Ignored(
341                "rate limit exceeded (blobs/hour)".to_string(),
342            ));
343        }
344        let mb_today = stats.bytes_today as f64 / (1024.0 * 1024.0);
345        if mb_today >= cfg.mb_per_day {
346            tracing::debug!(
347                node_id = %ann.ann.node_id,
348                limit = cfg.mb_per_day,
349                "rate limit exceeded (MB/day) — dropping announcement"
350            );
351            return Ok(AnnouncementDisposition::Ignored(
352                "rate limit exceeded (MB/day)".to_string(),
353            ));
354        }
355
356        stats.blobs_this_hour += 1;
357    }
358
359    // ── 4. Dedup ──────────────────────────────────────────────────────────────
360    if node
361        .store()
362        .has_index_record(&ann.ann.meta_hash, &ann.ann.node_id)?
363    {
364        tracing::trace!(igc_hash = %ann.ann.igc_hash, "already indexed — skipping");
365        return Ok(AnnouncementDisposition::Ignored(
366            "already indexed".to_string(),
367        ));
368    }
369
370    if node.store().has_meta_hash(&ann.ann.meta_hash)? {
371        record_remote_announcement(node, &ann.ann).await?;
372        tracing::debug!(igc_hash = %ann.ann.igc_hash, node_id = %ann.ann.node_id, "known metadata from new serving peer");
373        return Ok(AnnouncementDisposition::Indexed { fetched_igc: false });
374    }
375
376    tracing::debug!(igc_hash = %ann.ann.igc_hash, "new announcement received");
377
378    // ── 5. Fetch metadata blob ────────────────────────────────────────────────
379    let meta_bytes = fetch_blob(node, &ann.meta_ticket).await?;
380
381    // Verify hash
382    let actual_meta_hash = Blake3Hex::from_hash(blake3::hash(&meta_bytes));
383    if actual_meta_hash != ann.ann.meta_hash {
384        tracing::warn!(
385            expected = %ann.ann.meta_hash,
386            actual   = %actual_meta_hash,
387            "meta_hash mismatch — discarding"
388        );
389        return Ok(AnnouncementDisposition::Ignored(
390            AnnouncementError::MetaHashMismatch.to_string(),
391        ));
392    }
393
394    // ── 6. Validate metadata ──────────────────────────────────────────────────
395    let meta: FlightMetadata = match serde_json::from_slice(&meta_bytes) {
396        Ok(m) => m,
397        Err(e) => {
398            return Ok(AnnouncementDisposition::Ignored(
399                AnnouncementError::MetadataJson(e).to_string(),
400            ));
401        }
402    };
403    if let Err(e) = meta.validate() {
404        return Ok(AnnouncementDisposition::Ignored(e.to_string()));
405    }
406    if meta.igc_hash != ann.ann.igc_hash {
407        tracing::warn!(
408            expected = %ann.ann.igc_hash,
409            actual = %meta.igc_hash,
410            "metadata igc_hash mismatch — discarding"
411        );
412        return Ok(AnnouncementDisposition::Ignored(
413            AnnouncementError::MetadataIgcHashMismatch.to_string(),
414        ));
415    }
416
417    // ── 7. Store metadata blob ────────────────────────────────────────────────
418    node.store().put(&meta_bytes).await?;
419    // Track bytes accepted from this publisher for the MB/day rate limit.
420    record_bytes_accepted(&rl_state, &ann.ann.node_id, meta_bytes.len() as u64).await;
421
422    // ── 8. Apply fetch policy ─────────────────────────────────────────────────
423    let should_fetch_igc = match policy {
424        FetchPolicy::MetadataOnly => false,
425        FetchPolicy::Eager => true,
426        FetchPolicy::GeoFiltered {
427            min_lat,
428            max_lat,
429            min_lon,
430            max_lon,
431        } => meta.bbox.as_ref().is_some_and(|bb| {
432            bb.max_lat >= *min_lat
433                && bb.min_lat <= *max_lat
434                && bb.max_lon >= *min_lon
435                && bb.min_lon <= *max_lon
436        }),
437    };
438
439    if should_fetch_igc {
440        let igc_bytes = fetch_blob(node, &ann.igc_ticket).await?;
441
442        let actual_igc_hash = Blake3Hex::from_hash(blake3::hash(&igc_bytes));
443        if actual_igc_hash != ann.ann.igc_hash {
444            tracing::warn!(
445                expected = %ann.ann.igc_hash,
446                actual   = %actual_igc_hash,
447                "igc_hash mismatch — discarding"
448            );
449            return Ok(AnnouncementDisposition::Ignored(
450                AnnouncementError::IgcHashMismatch.to_string(),
451            ));
452        }
453
454        node.store().put(&igc_bytes).await?;
455        record_bytes_accepted(&rl_state, &ann.ann.node_id, igc_bytes.len() as u64).await;
456        tracing::info!(igc_hash = %ann.ann.igc_hash, "fetched raw IGC blob");
457    }
458
459    // ── 9. Append source record ───────────────────────────────────────────────
460    record_remote_announcement(node, &ann.ann).await?;
461
462    tracing::info!(igc_hash = %ann.ann.igc_hash, "indexed flight");
463    Ok(AnnouncementDisposition::Indexed {
464        fetched_igc: should_fetch_igc,
465    })
466}
467
468/// Accumulate bytes into the rate-limit state for a publisher.  No-op when
469/// rate limiting is disabled (`rl_state` is `None`).
470async fn record_bytes_accepted(rl_state: &Option<RateLimitState>, node_id: &NodeIdHex, bytes: u64) {
471    if let Some(state) = rl_state {
472        let mut map = state.lock().await;
473        if let Some(stats) = map.get_mut(node_id) {
474            stats.bytes_today += bytes;
475        }
476    }
477}
478
479fn validate_payload_size(payload: &[u8]) -> Result<(), AnnouncementError> {
480    if payload.len() <= 1024 {
481        Ok(())
482    } else {
483        Err(AnnouncementError::TooLarge(payload.len()))
484    }
485}
486
487fn validate_announcement(ann: Announcement) -> Result<ValidatedAnnouncement, AnnouncementError> {
488    let igc_ticket: iroh_blobs::ticket::BlobTicket =
489        ann.igc_ticket
490            .parse::<iroh_blobs::ticket::BlobTicket>()
491            .map_err(|e| AnnouncementError::InvalidTicket {
492                ticket: "igc",
493                message: e.to_string(),
494            })?;
495    let meta_ticket: iroh_blobs::ticket::BlobTicket =
496        ann.meta_ticket
497            .parse::<iroh_blobs::ticket::BlobTicket>()
498            .map_err(|e| AnnouncementError::InvalidTicket {
499                ticket: "meta",
500                message: e.to_string(),
501            })?;
502
503    if Blake3Hex::from_bytes(igc_ticket.hash().as_bytes()) != ann.igc_hash {
504        return Err(AnnouncementError::TicketHashMismatch { ticket: "igc" });
505    }
506    if Blake3Hex::from_bytes(meta_ticket.hash().as_bytes()) != ann.meta_hash {
507        return Err(AnnouncementError::TicketHashMismatch { ticket: "meta" });
508    }
509    if NodeIdHex::from_public_key(igc_ticket.addr().id) != ann.node_id {
510        return Err(AnnouncementError::TicketNodeMismatch { ticket: "igc" });
511    }
512    if NodeIdHex::from_public_key(meta_ticket.addr().id) != ann.node_id {
513        return Err(AnnouncementError::TicketNodeMismatch { ticket: "meta" });
514    }
515
516    Ok(ValidatedAnnouncement {
517        ann,
518        igc_ticket,
519        meta_ticket,
520    })
521}
522
523async fn record_remote_announcement(
524    node: &IndexerHandle,
525    ann: &Announcement,
526) -> Result<(), crate::store::StoreError> {
527    let _was_appended = node
528        .store()
529        .append_index_if_absent(&IndexRecord {
530            source: IndexRecordSource::RemoteAnnouncement,
531            igc_hash: ann.igc_hash.clone(),
532            meta_hash: ann.meta_hash.clone(),
533            node_id: ann.node_id.clone(),
534            igc_ticket: ann.igc_ticket.clone(),
535            meta_ticket: ann.meta_ticket.clone(),
536            recorded_at: canonical_utc_now(),
537        })
538        .await?;
539    Ok(())
540}
541
542/// Download a blob from the network using a serialised `BlobTicket`.
543async fn fetch_blob(
544    node: &IndexerHandle,
545    ticket: &iroh_blobs::ticket::BlobTicket,
546) -> Result<Vec<u8>, IndexerError> {
547    let hash = ticket.hash();
548    let peer_id = ticket.addr().id;
549
550    // Download into our iroh-blobs store, using the peer as the provider.
551    let downloader = node.fs_store.downloader(&node.endpoint);
552    downloader
553        .download(hash, vec![peer_id])
554        .await
555        .map_err(|e| IndexerError::BlobDownload(e.to_string()))?;
556
557    // Read the bytes back from the local store.
558    let bytes = node
559        .fs_store
560        .blobs()
561        .get_bytes(hash)
562        .await
563        .map_err(|e| IndexerError::BlobRead(e.to_string()))?;
564
565    Ok(bytes.to_vec())
566}
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571    use crate::id::{Blake3Hex, NodeIdHex};
572
573    #[test]
574    fn validate_announcement_rejects_ticket_hash_mismatch() {
575        let ann = Announcement {
576            igc_hash: Blake3Hex::parse("a".repeat(64)).unwrap(),
577            meta_hash: Blake3Hex::parse("b".repeat(64)).unwrap(),
578            node_id: NodeIdHex::parse("c".repeat(64)).unwrap(),
579            igc_ticket: "blob_ticket_placeholder".to_string(),
580            meta_ticket: "blob_ticket_placeholder".to_string(),
581        };
582        assert!(validate_announcement(ann).is_err());
583    }
584
585    #[test]
586    fn deserialize_announcement_rejects_short_igc_hash() {
587        let json = format!(
588            r#"{{"igc_hash":"abc","meta_hash":"{}","node_id":"{}","igc_ticket":"","meta_ticket":""}}"#,
589            "b".repeat(64),
590            "c".repeat(64)
591        );
592        let result: Result<Announcement, _> = serde_json::from_str(&json);
593        assert!(result.is_err());
594    }
595
596    #[test]
597    fn deserialize_announcement_rejects_uppercase_meta_hash() {
598        let json = format!(
599            r#"{{"igc_hash":"{}","meta_hash":"{}","node_id":"{}","igc_ticket":"","meta_ticket":""}}"#,
600            "a".repeat(64),
601            "B".repeat(64),
602            "c".repeat(64)
603        );
604        let result: Result<Announcement, _> = serde_json::from_str(&json);
605        assert!(result.is_err());
606    }
607
608    #[test]
609    fn deserialize_announcement_rejects_short_node_id() {
610        let json = format!(
611            r#"{{"igc_hash":"{}","meta_hash":"{}","node_id":"too_short","igc_ticket":"","meta_ticket":""}}"#,
612            "a".repeat(64),
613            "b".repeat(64)
614        );
615        let result: Result<Announcement, _> = serde_json::from_str(&json);
616        assert!(result.is_err());
617    }
618
619    #[test]
620    fn rate_limit_config_default_values() {
621        let cfg = RateLimitConfig::default();
622        assert_eq!(cfg.blobs_per_hour, 100);
623        assert!((cfg.mb_per_day - 200.0).abs() < f64::EPSILON);
624        assert!(cfg.trusted_node_ids.is_empty());
625    }
626
627    #[test]
628    fn malformed_json_is_silently_ignored() {
629        // Verify that serde_json::from_slice fails gracefully for non-JSON.
630        let result: Result<Announcement, _> = serde_json::from_slice(b"not json at all");
631        assert!(result.is_err());
632    }
633
634    #[test]
635    fn announcement_missing_required_field_fails_parse() {
636        // JSON with igc_hash but missing meta_hash, node_id, tickets.
637        let json = r#"{"igc_hash":"aaaa"}"#;
638        let result: Result<Announcement, _> = serde_json::from_slice(json.as_bytes());
639        assert!(result.is_err());
640    }
641
642    #[test]
643    fn oversized_announcement_is_rejected() {
644        assert!(matches!(
645            validate_payload_size(&vec![0_u8; 1025]),
646            Err(AnnouncementError::TooLarge(1025))
647        ));
648    }
649}