1#![allow(unexpected_cfgs)]
2mod api;
17mod config;
18pub mod descriptor;
19mod mapping;
20
21pub use config::{CategoryConfig, CloudflareRadarBootstrapConfig};
22
23use anyhow::{anyhow, Result};
24use async_trait::async_trait;
25use chrono::Utc;
26use drasi_lib::bootstrap::{
27 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
28};
29use drasi_lib::channels::{BootstrapEvent, BootstrapEventSender};
30use log::{info, warn};
31use mapping::{
32 map_attack_l3_summary, map_attack_l7_summary, map_dns_summary, map_domain_ranking, map_hijack,
33 map_http_summary, map_leak, map_outage, normalize_id,
34};
35use reqwest::Client;
36use serde::de::DeserializeOwned;
37use std::collections::HashSet;
38use std::sync::atomic::Ordering;
39use tokio::time::{sleep, Duration};
40
41use crate::api::{
42 AttackLayer3SummaryResult, AttackSummaryResult, BgpHijackResult, BgpLeakResult,
43 CloudflareResponse, DnsTopLocationsResult, HttpSummaryResult, OutageResult, RankingResult,
44};
45
46pub struct CloudflareRadarBootstrapProvider {
47 config: CloudflareRadarBootstrapConfig,
48 client: Client,
49}
50
51impl CloudflareRadarBootstrapProvider {
52 pub fn builder() -> CloudflareRadarBootstrapProviderBuilder {
53 CloudflareRadarBootstrapProviderBuilder::new()
54 }
55}
56
57pub struct CloudflareRadarBootstrapProviderBuilder {
58 config: CloudflareRadarBootstrapConfig,
59}
60
61impl Default for CloudflareRadarBootstrapProviderBuilder {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67impl CloudflareRadarBootstrapProviderBuilder {
68 pub fn new() -> Self {
69 Self {
70 config: CloudflareRadarBootstrapConfig::default(),
71 }
72 }
73
74 pub fn with_api_token(mut self, token: impl Into<String>) -> Self {
75 self.config.api_token = token.into();
76 self
77 }
78
79 pub fn with_api_base_url(mut self, base_url: impl Into<String>) -> Self {
80 self.config.api_base_url = base_url.into();
81 self
82 }
83
84 pub fn with_category(mut self, name: &str, enabled: bool) -> Self {
85 match name {
86 "outages" => self.config.categories.outages = enabled,
87 "bgp_hijacks" => self.config.categories.bgp_hijacks = enabled,
88 "bgp_leaks" => self.config.categories.bgp_leaks = enabled,
89 "http_traffic" => self.config.categories.http_traffic = enabled,
90 "attacks_l7" => self.config.categories.attacks_l7 = enabled,
91 "attacks_l3" => self.config.categories.attacks_l3 = enabled,
92 "domain_rankings" => self.config.categories.domain_rankings = enabled,
93 "dns" => self.config.categories.dns = enabled,
94 _ => {
95 log::warn!("Unknown Cloudflare Radar category: '{name}'");
96 }
97 }
98 self
99 }
100
101 pub fn with_location_filter(mut self, locations: Vec<String>) -> Self {
102 self.config.location_filter = Some(locations);
103 self
104 }
105
106 pub fn with_bgp_asn_filter(mut self, asns: Vec<u32>) -> Self {
107 self.config.asn_filter = Some(asns);
108 self
109 }
110
111 pub fn with_hijack_min_confidence(mut self, min_confidence: u32) -> Self {
112 self.config.hijack_min_confidence = Some(min_confidence);
113 self
114 }
115
116 pub fn with_ranking_limit(mut self, limit: u32) -> Self {
117 self.config.ranking_limit = limit;
118 self
119 }
120
121 pub fn with_dns_domains(mut self, domains: Vec<String>) -> Self {
122 self.config.dns_domains = Some(domains);
123 self
124 }
125
126 pub fn with_analytics_date_range(mut self, range: impl Into<String>) -> Self {
127 self.config.analytics_date_range = range.into();
128 self
129 }
130
131 pub fn with_event_date_range(mut self, range: impl Into<String>) -> Self {
132 self.config.event_date_range = range.into();
133 self
134 }
135
136 pub fn build(self) -> Result<CloudflareRadarBootstrapProvider> {
137 if self.config.api_token.trim().is_empty() {
138 return Err(anyhow!("Cloudflare Radar API token is required"));
139 }
140 if self.config.categories.dns
141 && self
142 .config
143 .dns_domains
144 .as_ref()
145 .map(|d| d.is_empty())
146 .unwrap_or(true)
147 {
148 return Err(anyhow!("DNS category enabled but no dns_domains provided"));
149 }
150 if !self.config.categories.outages
151 && !self.config.categories.bgp_hijacks
152 && !self.config.categories.bgp_leaks
153 && !self.config.categories.http_traffic
154 && !self.config.categories.attacks_l7
155 && !self.config.categories.attacks_l3
156 && !self.config.categories.domain_rankings
157 && !self.config.categories.dns
158 {
159 return Err(anyhow!("At least one category must be enabled"));
160 }
161
162 let client = build_client(&self.config.api_token)?;
163 Ok(CloudflareRadarBootstrapProvider {
164 config: self.config,
165 client,
166 })
167 }
168}
169
170#[async_trait]
171impl BootstrapProvider for CloudflareRadarBootstrapProvider {
172 async fn bootstrap(
173 &self,
174 request: BootstrapRequest,
175 context: &BootstrapContext,
176 event_tx: BootstrapEventSender,
177 _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
178 ) -> Result<BootstrapResult> {
179 info!(
180 "[{}] Cloudflare Radar bootstrap started for query {}",
181 context.source_id, request.query_id
182 );
183
184 let mut sent_ids: HashSet<String> = HashSet::new();
185 let mut total_events = 0usize;
186
187 if self.config.categories.outages
188 && query_requests_any(
189 &request,
190 &["Outage", "Location", "AutonomousSystem"],
191 &["AFFECTS_LOCATION", "AFFECTS_ASN"],
192 )
193 {
194 let url = build_url(
195 &self.config.api_base_url,
196 "/radar/annotations/outages",
197 &[
198 ("limit", "50".to_string()),
199 ("dateRange", self.config.event_date_range.clone()),
200 ("format", "json".to_string()),
201 ],
202 );
203 let result: OutageResult = fetch_cloudflare(&self.client, &url).await?;
204 for outage in result.annotations.iter() {
205 if !outage_matches_filters(
206 outage,
207 self.config.location_filter.as_ref(),
208 self.config.asn_filter.as_ref(),
209 ) {
210 continue;
211 }
212 let outage_id = compute_outage_id(outage);
213 let changes = map_outage(&context.source_id, &outage_id, outage, true);
214 total_events += send_changes(
215 &context.source_id,
216 &event_tx,
217 &context.sequence_counter,
218 changes,
219 &mut sent_ids,
220 )
221 .await?;
222 }
223 }
224
225 if self.config.categories.bgp_hijacks
226 && query_requests_any(
227 &request,
228 &["BgpHijack", "AutonomousSystem", "Prefix"],
229 &["HIJACKED_BY", "VICTIM_ASN", "TARGETS_PREFIX"],
230 )
231 {
232 let asns = self.config.asn_filter.clone().unwrap_or_default();
233 let mut targets = vec![];
234 if asns.is_empty() {
235 targets.push(None);
236 } else {
237 for asn in asns {
238 targets.push(Some(asn));
239 }
240 }
241
242 for target_asn in targets {
243 let mut params = vec![
244 ("per_page", "50".to_string()),
245 ("format", "json".to_string()),
246 ];
247 if let Some(asn) = target_asn {
248 params.push(("involvedAsn", asn.to_string()));
249 }
250 if let Some(min_conf) = self.config.hijack_min_confidence {
251 params.push(("minConfidence", min_conf.to_string()));
252 }
253 let url = build_url(
254 &self.config.api_base_url,
255 "/radar/bgp/hijacks/events",
256 ¶ms,
257 );
258 let result: BgpHijackResult = fetch_cloudflare(&self.client, &url).await?;
259 for event in result.events.iter() {
260 let changes = map_hijack(&context.source_id, event, true);
261 total_events += send_changes(
262 &context.source_id,
263 &event_tx,
264 &context.sequence_counter,
265 changes,
266 &mut sent_ids,
267 )
268 .await?;
269 }
270 }
271 }
272
273 if self.config.categories.bgp_leaks
274 && query_requests_any(&request, &["BgpLeak", "AutonomousSystem"], &["LEAKED_BY"])
275 {
276 let asns = self.config.asn_filter.clone().unwrap_or_default();
277 let mut targets = vec![];
278 if asns.is_empty() {
279 targets.push(None);
280 } else {
281 for asn in asns {
282 targets.push(Some(asn));
283 }
284 }
285
286 for target_asn in targets {
287 let mut params = vec![
288 ("per_page", "50".to_string()),
289 ("format", "json".to_string()),
290 ];
291 if let Some(asn) = target_asn {
292 params.push(("involvedAsn", asn.to_string()));
293 }
294 let url = build_url(
295 &self.config.api_base_url,
296 "/radar/bgp/leaks/events",
297 ¶ms,
298 );
299 let result: BgpLeakResult = fetch_cloudflare(&self.client, &url).await?;
300 for event in result.events.iter() {
301 let changes = map_leak(&context.source_id, event, true);
302 total_events += send_changes(
303 &context.source_id,
304 &event_tx,
305 &context.sequence_counter,
306 changes,
307 &mut sent_ids,
308 )
309 .await?;
310 }
311 }
312 }
313
314 if self.config.categories.http_traffic
315 && query_requests_any(&request, &["HttpTraffic"], &[])
316 {
317 let params = vec![
318 ("dateRange", self.config.analytics_date_range.clone()),
319 ("format", "json".to_string()),
320 ];
321 let url = build_url(
322 &self.config.api_base_url,
323 "/radar/http/summary/device_type",
324 ¶ms,
325 );
326 let result: HttpSummaryResult = fetch_cloudflare(&self.client, &url).await?;
327 let effective_from = Utc::now().timestamp_millis() as u64;
328 for (series, summary) in result.summaries.iter() {
329 let node_id = format!("http-traffic-{}", normalize_id(series));
330 let change = map_http_summary(
331 &context.source_id,
332 &node_id,
333 series,
334 summary,
335 effective_from,
336 );
337 total_events += send_changes(
338 &context.source_id,
339 &event_tx,
340 &context.sequence_counter,
341 vec![change],
342 &mut sent_ids,
343 )
344 .await?;
345 }
346 }
347
348 if self.config.categories.attacks_l7 && query_requests_any(&request, &["AttackL7"], &[]) {
349 let params = vec![
350 ("dateRange", self.config.analytics_date_range.clone()),
351 ("format", "json".to_string()),
352 ];
353 let url = build_url(
354 &self.config.api_base_url,
355 "/radar/attacks/layer7/summary",
356 ¶ms,
357 );
358 let result: AttackSummaryResult = fetch_cloudflare(&self.client, &url).await?;
359 let effective_from = Utc::now().timestamp_millis() as u64;
360 for (series, summary) in result.summaries.iter() {
361 let node_id = format!("attack-l7-{}", normalize_id(series));
362 let change = map_attack_l7_summary(
363 &context.source_id,
364 &node_id,
365 series,
366 summary,
367 effective_from,
368 );
369 total_events += send_changes(
370 &context.source_id,
371 &event_tx,
372 &context.sequence_counter,
373 vec![change],
374 &mut sent_ids,
375 )
376 .await?;
377 }
378 }
379
380 if self.config.categories.attacks_l3 && query_requests_any(&request, &["AttackL3"], &[]) {
381 let params = vec![
382 ("dateRange", self.config.analytics_date_range.clone()),
383 ("format", "json".to_string()),
384 ];
385 let url = build_url(
386 &self.config.api_base_url,
387 "/radar/attacks/layer3/summary",
388 ¶ms,
389 );
390 let result: AttackLayer3SummaryResult = fetch_cloudflare(&self.client, &url).await?;
391 let effective_from = Utc::now().timestamp_millis() as u64;
392 for (series, summary) in result.summaries.iter() {
393 let node_id = format!("attack-l3-{}", normalize_id(series));
394 let change = map_attack_l3_summary(
395 &context.source_id,
396 &node_id,
397 series,
398 summary,
399 effective_from,
400 );
401 total_events += send_changes(
402 &context.source_id,
403 &event_tx,
404 &context.sequence_counter,
405 vec![change],
406 &mut sent_ids,
407 )
408 .await?;
409 }
410 }
411
412 if self.config.categories.domain_rankings
413 && query_requests_any(&request, &["Domain", "DomainRanking"], &["RANKED_AS"])
414 {
415 let params = vec![("limit", self.config.ranking_limit.to_string())];
416 let url = build_url(&self.config.api_base_url, "/radar/ranking/top", ¶ms);
417 let result: RankingResult = fetch_cloudflare(&self.client, &url).await?;
418 let effective_from = Utc::now().timestamp_millis() as u64;
419 for ranks in result.rankings.values() {
420 for domain in ranks {
421 let changes = map_domain_ranking(&context.source_id, domain, effective_from);
422 total_events += send_changes(
423 &context.source_id,
424 &event_tx,
425 &context.sequence_counter,
426 changes,
427 &mut sent_ids,
428 )
429 .await?;
430 }
431 }
432 }
433
434 if self.config.categories.dns && query_requests_any(&request, &["DnsQuerySummary"], &[]) {
435 let domains = self.config.dns_domains.clone().unwrap_or_default();
436 let effective_from = Utc::now().timestamp_millis() as u64;
437 for domain in domains {
438 let params = vec![
439 ("domain", domain.clone()),
440 ("dateRange", self.config.analytics_date_range.clone()),
441 ("format", "json".to_string()),
442 ("limit", "10".to_string()),
443 ];
444 let url = build_url(
445 &self.config.api_base_url,
446 "/radar/dns/top/locations",
447 ¶ms,
448 );
449 let result: DnsTopLocationsResult = fetch_cloudflare(&self.client, &url).await?;
450 if let Some(locations) = result.top.values().next() {
451 let change =
452 map_dns_summary(&context.source_id, &domain, locations, effective_from);
453 total_events += send_changes(
454 &context.source_id,
455 &event_tx,
456 &context.sequence_counter,
457 vec![change],
458 &mut sent_ids,
459 )
460 .await?;
461 }
462 }
463 }
464
465 info!(
466 "[{}] Cloudflare Radar bootstrap complete: {} events",
467 context.source_id, total_events
468 );
469
470 Ok(BootstrapResult {
471 event_count: total_events,
472 source_position: None,
473 })
474 }
475}
476
477fn build_client(api_token: &str) -> Result<Client> {
478 let mut headers = reqwest::header::HeaderMap::new();
479 headers.insert(
480 reqwest::header::AUTHORIZATION,
481 reqwest::header::HeaderValue::from_str(&format!("Bearer {api_token}"))?,
482 );
483
484 let client = Client::builder()
485 .default_headers(headers)
486 .timeout(std::time::Duration::from_secs(30))
487 .build()?;
488 Ok(client)
489}
490
491async fn fetch_cloudflare<T: DeserializeOwned>(client: &Client, url: &str) -> Result<T> {
492 let mut attempt = 0;
493 let mut delay = Duration::from_millis(500);
494 loop {
495 attempt += 1;
496 let response = match client.get(url).send().await {
497 Ok(response) => response,
498 Err(err) => {
499 if attempt >= 4 {
500 return Err(err.into());
501 }
502 warn!("Cloudflare API request failed ({err}); retrying in {delay:?}");
503 sleep(delay).await;
504 delay *= 2;
505 continue;
506 }
507 };
508
509 if response.status().is_success() {
510 let payload: CloudflareResponse<T> = response.json().await?;
511 if payload.success {
512 return Ok(payload.result);
513 }
514 return Err(anyhow!("Cloudflare API returned success=false"));
515 }
516
517 if response.status().as_u16() == 429 || response.status().is_server_error() {
518 if attempt >= 4 {
519 return Err(anyhow!(
520 "Cloudflare API error after retries: {}",
521 response.status()
522 ));
523 }
524 warn!(
525 "Cloudflare API rate limited or server error ({}); retrying in {:?}",
526 response.status(),
527 delay
528 );
529 sleep(delay).await;
530 delay *= 2;
531 continue;
532 }
533 return Err(anyhow!(
534 "Cloudflare API request failed: {}",
535 response.status()
536 ));
537 }
538}
539
540fn build_url(base: &str, path: &str, params: &[(&str, String)]) -> String {
541 if params.is_empty() {
542 format!("{base}{path}")
543 } else {
544 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
545 for (key, value) in params {
546 serializer.append_pair(key, value);
547 }
548 let query = serializer.finish();
549 format!("{base}{path}?{query}")
550 }
551}
552
553fn query_requests_any(
554 request: &BootstrapRequest,
555 node_labels: &[&str],
556 relation_labels: &[&str],
557) -> bool {
558 if request.node_labels.is_empty() && request.relation_labels.is_empty() {
559 return true;
560 }
561 node_labels
562 .iter()
563 .any(|label| request.node_labels.contains(&label.to_string()))
564 || relation_labels
565 .iter()
566 .any(|label| request.relation_labels.contains(&label.to_string()))
567}
568
569async fn send_changes(
570 source_id: &str,
571 event_tx: &BootstrapEventSender,
572 sequence_counter: &std::sync::Arc<std::sync::atomic::AtomicU64>,
573 changes: Vec<drasi_core::models::SourceChange>,
574 sent_ids: &mut HashSet<String>,
575) -> Result<usize> {
576 let mut count = 0usize;
577 for change in changes {
578 let reference = change.get_reference();
579 let key = format!("{}:{}", source_id, reference.element_id);
580 if !sent_ids.insert(key) {
581 continue;
582 }
583 let sequence = sequence_counter.fetch_add(1, Ordering::SeqCst);
584 let event = BootstrapEvent {
585 source_id: source_id.to_string(),
586 change,
587 timestamp: Utc::now(),
588 sequence,
589 };
590 event_tx
591 .send(event)
592 .await
593 .map_err(|err| anyhow!(err.to_string()))?;
594 count += 1;
595 }
596 Ok(count)
597}
598
599fn compute_outage_id(outage: &crate::api::OutageAnnotation) -> String {
600 let start = outage
601 .start_date
602 .clone()
603 .unwrap_or_else(|| "unknown".to_string());
604 let scope = outage.scope.clone().unwrap_or_default();
605 let mut locations = outage.locations.clone();
606 locations.sort();
607 let key = format!("{start}-{scope}-{}", locations.join(","));
608 format!("outage-{}", normalize_id(&key))
609}
610
611fn outage_matches_filters(
612 outage: &crate::api::OutageAnnotation,
613 location_filter: Option<&Vec<String>>,
614 asn_filter: Option<&Vec<u32>>,
615) -> bool {
616 if let Some(locations) = location_filter {
617 if !locations.is_empty()
618 && !outage
619 .locations
620 .iter()
621 .any(|location| locations.contains(location))
622 {
623 return false;
624 }
625 }
626
627 if let Some(asns) = asn_filter {
628 if !asns.is_empty() && !outage.asns.iter().any(|asn| asns.contains(asn)) {
629 return false;
630 }
631 }
632
633 true
634}
635
636#[cfg(feature = "dynamic-plugin")]
638drasi_plugin_sdk::export_plugin!(
639 plugin_id = "cloudflare-radar-bootstrap",
640 core_version = env!("CARGO_PKG_VERSION"),
641 lib_version = env!("CARGO_PKG_VERSION"),
642 plugin_version = env!("CARGO_PKG_VERSION"),
643 source_descriptors = [],
644 reaction_descriptors = [],
645 bootstrap_descriptors = [descriptor::CloudflareRadarBootstrapDescriptor],
646);