1use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use futures::StreamExt;
10use iroh_gossip::TopicId;
11use iroh_gossip::api::Event;
12use tokio::sync::{Mutex, Semaphore};
13
14use crate::id::{Blake3Hex, NodeIdHex};
15use crate::metadata::FlightMetadata;
16use crate::node::IgcIrohNode;
17use crate::store::{FlatFileStore, IndexRecord, IndexRecordSource};
18use crate::topic::announce_topic_id;
19use crate::util::canonical_utc_now;
20
21#[derive(Debug, thiserror::Error)]
24pub enum IndexerError {
25 #[error("gossip: {0}")]
26 Gossip(String),
27 #[error("store: {0}")]
28 Store(#[from] crate::store::StoreError),
29 #[error("failed to download blob: {0}")]
30 BlobDownload(String),
31 #[error("failed to read downloaded blob: {0}")]
32 BlobRead(String),
33}
34
35#[derive(Debug, thiserror::Error)]
36enum AnnouncementError {
37 #[error("JSON: {0}")]
38 Json(#[from] serde_json::Error),
39 #[error("announcement exceeds 1024-byte limit: {0} bytes")]
40 TooLarge(usize),
41 #[error("invalid {ticket} ticket: {message}")]
42 InvalidTicket {
43 ticket: &'static str,
44 message: String,
45 },
46 #[error("{ticket} ticket hash mismatch")]
47 TicketHashMismatch { ticket: &'static str },
48 #[error("{ticket} ticket node mismatch")]
49 TicketNodeMismatch { ticket: &'static str },
50 #[error("metadata JSON: {0}")]
51 MetadataJson(serde_json::Error),
52 #[error("metadata: {0}")]
53 Metadata(#[from] crate::metadata::MetadataError),
54 #[error("metadata igc_hash mismatch")]
55 MetadataIgcHashMismatch,
56 #[error("meta_hash mismatch")]
57 MetaHashMismatch,
58 #[error("igc_hash mismatch")]
59 IgcHashMismatch,
60}
61
62#[derive(Debug)]
63enum AnnouncementDisposition {
64 Ignored(String),
65 Indexed { fetched_igc: bool },
66}
67
68struct ValidatedAnnouncement {
69 ann: Announcement,
70 igc_ticket: iroh_blobs::ticket::BlobTicket,
71 meta_ticket: iroh_blobs::ticket::BlobTicket,
72}
73
74#[derive(Clone)]
75struct IndexerHandle {
76 endpoint: iroh::Endpoint,
77 fs_store: iroh_blobs::store::fs::FsStore,
78 store: Arc<FlatFileStore>,
79}
80
81impl IndexerHandle {
82 fn store(&self) -> &FlatFileStore {
83 self.store.as_ref()
84 }
85}
86
87#[derive(Debug, Clone)]
91pub enum FetchPolicy {
92 MetadataOnly,
94 Eager,
96 GeoFiltered {
98 min_lat: f64,
99 max_lat: f64,
100 min_lon: f64,
101 max_lon: f64,
102 },
103}
104
105#[derive(Debug, Clone)]
112pub struct RateLimitConfig {
113 pub blobs_per_hour: u32,
115 pub mb_per_day: f64,
117 pub trusted_node_ids: HashSet<NodeIdHex>,
119}
120
121impl Default for RateLimitConfig {
122 fn default() -> Self {
123 Self {
124 blobs_per_hour: 100,
125 mb_per_day: 200.0,
126 trusted_node_ids: HashSet::new(),
127 }
128 }
129}
130
131#[derive(Debug, Clone)]
135pub struct IndexerConfig {
136 pub policy: FetchPolicy,
138 pub bootstrap: Vec<iroh::PublicKey>,
143 pub rate_limit: Option<RateLimitConfig>,
147}
148
149impl IndexerConfig {
150 pub fn simple(policy: FetchPolicy, bootstrap: Vec<iroh::PublicKey>) -> Self {
152 Self {
153 policy,
154 bootstrap,
155 rate_limit: None,
156 }
157 }
158}
159
160struct PublisherStats {
163 blobs_this_hour: u32,
164 hour_window_start: Instant,
165 bytes_today: u64,
166 day_window_start: Instant,
167}
168
169impl Default for PublisherStats {
170 fn default() -> Self {
171 let now = Instant::now();
172 Self {
173 blobs_this_hour: 0,
174 hour_window_start: now,
175 bytes_today: 0,
176 day_window_start: now,
177 }
178 }
179}
180
181type RateLimitState = Arc<Mutex<HashMap<NodeIdHex, PublisherStats>>>;
182
183const DEFAULT_MAX_CONCURRENT_ANNOUNCEMENTS: usize = 64;
184
185#[derive(Debug, serde::Deserialize)]
188struct Announcement {
189 igc_hash: Blake3Hex,
190 meta_hash: Blake3Hex,
191 #[allow(dead_code)]
192 node_id: NodeIdHex,
193 igc_ticket: String,
194 meta_ticket: String,
195}
196
197pub async fn run_indexer(
209 node: &IgcIrohNode,
210 config: IndexerConfig,
211) -> Result<(), IndexerError> {
212 let topic = TopicId::from_bytes(announce_topic_id());
213 let handle = Arc::new(IndexerHandle {
214 endpoint: node.endpoint.clone(),
215 fs_store: node.fs_store.clone(),
216 store: Arc::clone(&node.store),
217 });
218
219 let mut stream = if config.bootstrap.is_empty() {
224 node.gossip
225 .subscribe(topic, config.bootstrap)
226 .await
227 .map_err(|e| IndexerError::Gossip(e.to_string()))?
228 } else {
229 node.gossip
230 .subscribe_and_join(topic, config.bootstrap)
231 .await
232 .map_err(|e| IndexerError::Gossip(e.to_string()))?
233 };
234
235 tracing::info!("indexer started — listening for flight announcements");
236
237 let rl_state: Option<RateLimitState> = config
238 .rate_limit
239 .as_ref()
240 .map(|_| Arc::new(Mutex::new(HashMap::new())));
241 let permits = Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT_ANNOUNCEMENTS));
242
243 while let Some(item) = stream.next().await {
244 let event = match item {
245 Ok(e) => e,
246 Err(e) => {
247 tracing::warn!("gossip stream error: {e}");
248 return Err(IndexerError::Gossip(e.to_string()));
249 }
250 };
251
252 if let Event::Received(msg) = event {
253 let payload = msg.content.clone();
254 let handle = Arc::clone(&handle);
255 let policy = config.policy.clone();
256 let rl_cfg = config.rate_limit.clone();
257 let rl_state = rl_state.clone();
258 let permit = Arc::clone(&permits)
259 .acquire_owned()
260 .await
261 .map_err(|_| IndexerError::Gossip("announcement semaphore closed".to_string()))?;
262 tokio::spawn(async move {
263 let _permit = permit;
264 match handle_announcement(&handle, &payload, &policy, rl_cfg.as_ref(), rl_state)
265 .await
266 {
267 Ok(AnnouncementDisposition::Ignored(reason)) => {
268 tracing::debug!(%reason, "announcement ignored");
269 }
270 Ok(AnnouncementDisposition::Indexed { fetched_igc }) => {
271 tracing::debug!(fetched_igc, "announcement indexed");
272 }
273 Err(e) => {
274 tracing::warn!("announcement handling failed: {e}");
275 }
276 }
277 });
278 }
279 }
280
281 Ok(())
282}
283
284async fn handle_announcement(
287 node: &IndexerHandle,
288 payload: &[u8],
289 policy: &FetchPolicy,
290 rl_cfg: Option<&RateLimitConfig>,
291 rl_state: Option<RateLimitState>,
292) -> Result<AnnouncementDisposition, IndexerError> {
293 if let Err(e) = validate_payload_size(payload) {
294 return Ok(AnnouncementDisposition::Ignored(
295 e.to_string(),
296 ));
297 }
298
299 let ann: Announcement = match serde_json::from_slice(payload) {
301 Ok(a) => a,
302 Err(e) => {
303 return Ok(AnnouncementDisposition::Ignored(format!(
304 "malformed announcement: {e}"
305 )));
306 }
307 };
308
309 let ann = match validate_announcement(ann) {
311 Ok(ann) => ann,
312 Err(e) => return Ok(AnnouncementDisposition::Ignored(e.to_string())),
313 };
314
315 if let (Some(cfg), Some(state)) = (rl_cfg, rl_state.as_ref())
317 && !cfg.trusted_node_ids.contains(&ann.ann.node_id)
318 {
319 let mut map = state.lock().await;
320 let stats = map.entry(ann.ann.node_id.clone()).or_default();
321 let now = Instant::now();
322
323 if now.duration_since(stats.hour_window_start) >= Duration::from_secs(3600) {
325 stats.blobs_this_hour = 0;
326 stats.hour_window_start = now;
327 }
328 if now.duration_since(stats.day_window_start) >= Duration::from_secs(86400) {
330 stats.bytes_today = 0;
331 stats.day_window_start = now;
332 }
333
334 if stats.blobs_this_hour >= cfg.blobs_per_hour {
335 tracing::debug!(
336 node_id = %ann.ann.node_id,
337 limit = cfg.blobs_per_hour,
338 "rate limit exceeded (blobs/hour) — dropping announcement"
339 );
340 return Ok(AnnouncementDisposition::Ignored(
341 "rate limit exceeded (blobs/hour)".to_string(),
342 ));
343 }
344 let mb_today = stats.bytes_today as f64 / (1024.0 * 1024.0);
345 if mb_today >= cfg.mb_per_day {
346 tracing::debug!(
347 node_id = %ann.ann.node_id,
348 limit = cfg.mb_per_day,
349 "rate limit exceeded (MB/day) — dropping announcement"
350 );
351 return Ok(AnnouncementDisposition::Ignored(
352 "rate limit exceeded (MB/day)".to_string(),
353 ));
354 }
355
356 stats.blobs_this_hour += 1;
357 }
358
359 if node
361 .store()
362 .has_index_record(&ann.ann.meta_hash, &ann.ann.node_id)?
363 {
364 tracing::trace!(igc_hash = %ann.ann.igc_hash, "already indexed — skipping");
365 return Ok(AnnouncementDisposition::Ignored(
366 "already indexed".to_string(),
367 ));
368 }
369
370 if node.store().has_meta_hash(&ann.ann.meta_hash)? {
371 record_remote_announcement(node, &ann.ann).await?;
372 tracing::debug!(igc_hash = %ann.ann.igc_hash, node_id = %ann.ann.node_id, "known metadata from new serving peer");
373 return Ok(AnnouncementDisposition::Indexed { fetched_igc: false });
374 }
375
376 tracing::debug!(igc_hash = %ann.ann.igc_hash, "new announcement received");
377
378 let meta_bytes = fetch_blob(node, &ann.meta_ticket).await?;
380
381 let actual_meta_hash = Blake3Hex::from_hash(blake3::hash(&meta_bytes));
383 if actual_meta_hash != ann.ann.meta_hash {
384 tracing::warn!(
385 expected = %ann.ann.meta_hash,
386 actual = %actual_meta_hash,
387 "meta_hash mismatch — discarding"
388 );
389 return Ok(AnnouncementDisposition::Ignored(
390 AnnouncementError::MetaHashMismatch.to_string(),
391 ));
392 }
393
394 let meta: FlightMetadata = match serde_json::from_slice(&meta_bytes) {
396 Ok(m) => m,
397 Err(e) => {
398 return Ok(AnnouncementDisposition::Ignored(
399 AnnouncementError::MetadataJson(e).to_string(),
400 ));
401 }
402 };
403 if let Err(e) = meta.validate() {
404 return Ok(AnnouncementDisposition::Ignored(e.to_string()));
405 }
406 if meta.igc_hash != ann.ann.igc_hash {
407 tracing::warn!(
408 expected = %ann.ann.igc_hash,
409 actual = %meta.igc_hash,
410 "metadata igc_hash mismatch — discarding"
411 );
412 return Ok(AnnouncementDisposition::Ignored(
413 AnnouncementError::MetadataIgcHashMismatch.to_string(),
414 ));
415 }
416
417 node.store().put(&meta_bytes).await?;
419 record_bytes_accepted(&rl_state, &ann.ann.node_id, meta_bytes.len() as u64).await;
421
422 let should_fetch_igc = match policy {
424 FetchPolicy::MetadataOnly => false,
425 FetchPolicy::Eager => true,
426 FetchPolicy::GeoFiltered {
427 min_lat,
428 max_lat,
429 min_lon,
430 max_lon,
431 } => meta.bbox.as_ref().is_some_and(|bb| {
432 bb.max_lat >= *min_lat
433 && bb.min_lat <= *max_lat
434 && bb.max_lon >= *min_lon
435 && bb.min_lon <= *max_lon
436 }),
437 };
438
439 if should_fetch_igc {
440 let igc_bytes = fetch_blob(node, &ann.igc_ticket).await?;
441
442 let actual_igc_hash = Blake3Hex::from_hash(blake3::hash(&igc_bytes));
443 if actual_igc_hash != ann.ann.igc_hash {
444 tracing::warn!(
445 expected = %ann.ann.igc_hash,
446 actual = %actual_igc_hash,
447 "igc_hash mismatch — discarding"
448 );
449 return Ok(AnnouncementDisposition::Ignored(
450 AnnouncementError::IgcHashMismatch.to_string(),
451 ));
452 }
453
454 node.store().put(&igc_bytes).await?;
455 record_bytes_accepted(&rl_state, &ann.ann.node_id, igc_bytes.len() as u64).await;
456 tracing::info!(igc_hash = %ann.ann.igc_hash, "fetched raw IGC blob");
457 }
458
459 record_remote_announcement(node, &ann.ann).await?;
461
462 tracing::info!(igc_hash = %ann.ann.igc_hash, "indexed flight");
463 Ok(AnnouncementDisposition::Indexed {
464 fetched_igc: should_fetch_igc,
465 })
466}
467
468async fn record_bytes_accepted(rl_state: &Option<RateLimitState>, node_id: &NodeIdHex, bytes: u64) {
471 if let Some(state) = rl_state {
472 let mut map = state.lock().await;
473 if let Some(stats) = map.get_mut(node_id) {
474 stats.bytes_today += bytes;
475 }
476 }
477}
478
479fn validate_payload_size(payload: &[u8]) -> Result<(), AnnouncementError> {
480 if payload.len() <= 1024 {
481 Ok(())
482 } else {
483 Err(AnnouncementError::TooLarge(payload.len()))
484 }
485}
486
487fn validate_announcement(ann: Announcement) -> Result<ValidatedAnnouncement, AnnouncementError> {
488 let igc_ticket: iroh_blobs::ticket::BlobTicket =
489 ann.igc_ticket
490 .parse::<iroh_blobs::ticket::BlobTicket>()
491 .map_err(|e| AnnouncementError::InvalidTicket {
492 ticket: "igc",
493 message: e.to_string(),
494 })?;
495 let meta_ticket: iroh_blobs::ticket::BlobTicket =
496 ann.meta_ticket
497 .parse::<iroh_blobs::ticket::BlobTicket>()
498 .map_err(|e| AnnouncementError::InvalidTicket {
499 ticket: "meta",
500 message: e.to_string(),
501 })?;
502
503 if Blake3Hex::from_bytes(igc_ticket.hash().as_bytes()) != ann.igc_hash {
504 return Err(AnnouncementError::TicketHashMismatch { ticket: "igc" });
505 }
506 if Blake3Hex::from_bytes(meta_ticket.hash().as_bytes()) != ann.meta_hash {
507 return Err(AnnouncementError::TicketHashMismatch { ticket: "meta" });
508 }
509 if NodeIdHex::from_public_key(igc_ticket.addr().id) != ann.node_id {
510 return Err(AnnouncementError::TicketNodeMismatch { ticket: "igc" });
511 }
512 if NodeIdHex::from_public_key(meta_ticket.addr().id) != ann.node_id {
513 return Err(AnnouncementError::TicketNodeMismatch { ticket: "meta" });
514 }
515
516 Ok(ValidatedAnnouncement {
517 ann,
518 igc_ticket,
519 meta_ticket,
520 })
521}
522
523async fn record_remote_announcement(
524 node: &IndexerHandle,
525 ann: &Announcement,
526) -> Result<(), crate::store::StoreError> {
527 let _was_appended = node
528 .store()
529 .append_index_if_absent(&IndexRecord {
530 source: IndexRecordSource::RemoteAnnouncement,
531 igc_hash: ann.igc_hash.clone(),
532 meta_hash: ann.meta_hash.clone(),
533 node_id: ann.node_id.clone(),
534 igc_ticket: ann.igc_ticket.clone(),
535 meta_ticket: ann.meta_ticket.clone(),
536 recorded_at: canonical_utc_now(),
537 })
538 .await?;
539 Ok(())
540}
541
542async fn fetch_blob(
544 node: &IndexerHandle,
545 ticket: &iroh_blobs::ticket::BlobTicket,
546) -> Result<Vec<u8>, IndexerError> {
547 let hash = ticket.hash();
548 let peer_id = ticket.addr().id;
549
550 let downloader = node.fs_store.downloader(&node.endpoint);
552 downloader
553 .download(hash, vec![peer_id])
554 .await
555 .map_err(|e| IndexerError::BlobDownload(e.to_string()))?;
556
557 let bytes = node
559 .fs_store
560 .blobs()
561 .get_bytes(hash)
562 .await
563 .map_err(|e| IndexerError::BlobRead(e.to_string()))?;
564
565 Ok(bytes.to_vec())
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571 use crate::id::{Blake3Hex, NodeIdHex};
572
573 #[test]
574 fn validate_announcement_rejects_ticket_hash_mismatch() {
575 let ann = Announcement {
576 igc_hash: Blake3Hex::parse("a".repeat(64)).unwrap(),
577 meta_hash: Blake3Hex::parse("b".repeat(64)).unwrap(),
578 node_id: NodeIdHex::parse("c".repeat(64)).unwrap(),
579 igc_ticket: "blob_ticket_placeholder".to_string(),
580 meta_ticket: "blob_ticket_placeholder".to_string(),
581 };
582 assert!(validate_announcement(ann).is_err());
583 }
584
585 #[test]
586 fn deserialize_announcement_rejects_short_igc_hash() {
587 let json = format!(
588 r#"{{"igc_hash":"abc","meta_hash":"{}","node_id":"{}","igc_ticket":"","meta_ticket":""}}"#,
589 "b".repeat(64),
590 "c".repeat(64)
591 );
592 let result: Result<Announcement, _> = serde_json::from_str(&json);
593 assert!(result.is_err());
594 }
595
596 #[test]
597 fn deserialize_announcement_rejects_uppercase_meta_hash() {
598 let json = format!(
599 r#"{{"igc_hash":"{}","meta_hash":"{}","node_id":"{}","igc_ticket":"","meta_ticket":""}}"#,
600 "a".repeat(64),
601 "B".repeat(64),
602 "c".repeat(64)
603 );
604 let result: Result<Announcement, _> = serde_json::from_str(&json);
605 assert!(result.is_err());
606 }
607
608 #[test]
609 fn deserialize_announcement_rejects_short_node_id() {
610 let json = format!(
611 r#"{{"igc_hash":"{}","meta_hash":"{}","node_id":"too_short","igc_ticket":"","meta_ticket":""}}"#,
612 "a".repeat(64),
613 "b".repeat(64)
614 );
615 let result: Result<Announcement, _> = serde_json::from_str(&json);
616 assert!(result.is_err());
617 }
618
619 #[test]
620 fn rate_limit_config_default_values() {
621 let cfg = RateLimitConfig::default();
622 assert_eq!(cfg.blobs_per_hour, 100);
623 assert!((cfg.mb_per_day - 200.0).abs() < f64::EPSILON);
624 assert!(cfg.trusted_node_ids.is_empty());
625 }
626
627 #[test]
628 fn malformed_json_is_silently_ignored() {
629 let result: Result<Announcement, _> = serde_json::from_slice(b"not json at all");
631 assert!(result.is_err());
632 }
633
634 #[test]
635 fn announcement_missing_required_field_fails_parse() {
636 let json = r#"{"igc_hash":"aaaa"}"#;
638 let result: Result<Announcement, _> = serde_json::from_slice(json.as_bytes());
639 assert!(result.is_err());
640 }
641
642 #[test]
643 fn oversized_announcement_is_rejected() {
644 assert!(matches!(
645 validate_payload_size(&vec![0_u8; 1025]),
646 Err(AnnouncementError::TooLarge(1025))
647 ));
648 }
649}