Skip to main content

drasi_bootstrap_cloudflare_radar/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod api;
17mod config;
18pub mod descriptor;
19mod mapping;
20
21pub use config::{CategoryConfig, CloudflareRadarBootstrapConfig};
22
23use anyhow::{anyhow, Result};
24use async_trait::async_trait;
25use chrono::Utc;
26use drasi_lib::bootstrap::{
27    BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
28};
29use drasi_lib::channels::{BootstrapEvent, BootstrapEventSender};
30use log::{info, warn};
31use mapping::{
32    map_attack_l3_summary, map_attack_l7_summary, map_dns_summary, map_domain_ranking, map_hijack,
33    map_http_summary, map_leak, map_outage, normalize_id,
34};
35use reqwest::Client;
36use serde::de::DeserializeOwned;
37use std::collections::HashSet;
38use std::sync::atomic::Ordering;
39use tokio::time::{sleep, Duration};
40
41use crate::api::{
42    AttackLayer3SummaryResult, AttackSummaryResult, BgpHijackResult, BgpLeakResult,
43    CloudflareResponse, DnsTopLocationsResult, HttpSummaryResult, OutageResult, RankingResult,
44};
45
46pub struct CloudflareRadarBootstrapProvider {
47    config: CloudflareRadarBootstrapConfig,
48    client: Client,
49}
50
51impl CloudflareRadarBootstrapProvider {
52    pub fn builder() -> CloudflareRadarBootstrapProviderBuilder {
53        CloudflareRadarBootstrapProviderBuilder::new()
54    }
55}
56
57pub struct CloudflareRadarBootstrapProviderBuilder {
58    config: CloudflareRadarBootstrapConfig,
59}
60
61impl Default for CloudflareRadarBootstrapProviderBuilder {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67impl CloudflareRadarBootstrapProviderBuilder {
68    pub fn new() -> Self {
69        Self {
70            config: CloudflareRadarBootstrapConfig::default(),
71        }
72    }
73
74    pub fn with_api_token(mut self, token: impl Into<String>) -> Self {
75        self.config.api_token = token.into();
76        self
77    }
78
79    pub fn with_api_base_url(mut self, base_url: impl Into<String>) -> Self {
80        self.config.api_base_url = base_url.into();
81        self
82    }
83
84    pub fn with_category(mut self, name: &str, enabled: bool) -> Self {
85        match name {
86            "outages" => self.config.categories.outages = enabled,
87            "bgp_hijacks" => self.config.categories.bgp_hijacks = enabled,
88            "bgp_leaks" => self.config.categories.bgp_leaks = enabled,
89            "http_traffic" => self.config.categories.http_traffic = enabled,
90            "attacks_l7" => self.config.categories.attacks_l7 = enabled,
91            "attacks_l3" => self.config.categories.attacks_l3 = enabled,
92            "domain_rankings" => self.config.categories.domain_rankings = enabled,
93            "dns" => self.config.categories.dns = enabled,
94            _ => {
95                log::warn!("Unknown Cloudflare Radar category: '{name}'");
96            }
97        }
98        self
99    }
100
101    pub fn with_location_filter(mut self, locations: Vec<String>) -> Self {
102        self.config.location_filter = Some(locations);
103        self
104    }
105
106    pub fn with_bgp_asn_filter(mut self, asns: Vec<u32>) -> Self {
107        self.config.asn_filter = Some(asns);
108        self
109    }
110
111    pub fn with_hijack_min_confidence(mut self, min_confidence: u32) -> Self {
112        self.config.hijack_min_confidence = Some(min_confidence);
113        self
114    }
115
116    pub fn with_ranking_limit(mut self, limit: u32) -> Self {
117        self.config.ranking_limit = limit;
118        self
119    }
120
121    pub fn with_dns_domains(mut self, domains: Vec<String>) -> Self {
122        self.config.dns_domains = Some(domains);
123        self
124    }
125
126    pub fn with_analytics_date_range(mut self, range: impl Into<String>) -> Self {
127        self.config.analytics_date_range = range.into();
128        self
129    }
130
131    pub fn with_event_date_range(mut self, range: impl Into<String>) -> Self {
132        self.config.event_date_range = range.into();
133        self
134    }
135
136    pub fn build(self) -> Result<CloudflareRadarBootstrapProvider> {
137        if self.config.api_token.trim().is_empty() {
138            return Err(anyhow!("Cloudflare Radar API token is required"));
139        }
140        if self.config.categories.dns
141            && self
142                .config
143                .dns_domains
144                .as_ref()
145                .map(|d| d.is_empty())
146                .unwrap_or(true)
147        {
148            return Err(anyhow!("DNS category enabled but no dns_domains provided"));
149        }
150        if !self.config.categories.outages
151            && !self.config.categories.bgp_hijacks
152            && !self.config.categories.bgp_leaks
153            && !self.config.categories.http_traffic
154            && !self.config.categories.attacks_l7
155            && !self.config.categories.attacks_l3
156            && !self.config.categories.domain_rankings
157            && !self.config.categories.dns
158        {
159            return Err(anyhow!("At least one category must be enabled"));
160        }
161
162        let client = build_client(&self.config.api_token)?;
163        Ok(CloudflareRadarBootstrapProvider {
164            config: self.config,
165            client,
166        })
167    }
168}
169
170#[async_trait]
171impl BootstrapProvider for CloudflareRadarBootstrapProvider {
172    async fn bootstrap(
173        &self,
174        request: BootstrapRequest,
175        context: &BootstrapContext,
176        event_tx: BootstrapEventSender,
177        _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
178    ) -> Result<BootstrapResult> {
179        info!(
180            "[{}] Cloudflare Radar bootstrap started for query {}",
181            context.source_id, request.query_id
182        );
183
184        let mut sent_ids: HashSet<String> = HashSet::new();
185        let mut total_events = 0usize;
186
187        if self.config.categories.outages
188            && query_requests_any(
189                &request,
190                &["Outage", "Location", "AutonomousSystem"],
191                &["AFFECTS_LOCATION", "AFFECTS_ASN"],
192            )
193        {
194            let url = build_url(
195                &self.config.api_base_url,
196                "/radar/annotations/outages",
197                &[
198                    ("limit", "50".to_string()),
199                    ("dateRange", self.config.event_date_range.clone()),
200                    ("format", "json".to_string()),
201                ],
202            );
203            let result: OutageResult = fetch_cloudflare(&self.client, &url).await?;
204            for outage in result.annotations.iter() {
205                if !outage_matches_filters(
206                    outage,
207                    self.config.location_filter.as_ref(),
208                    self.config.asn_filter.as_ref(),
209                ) {
210                    continue;
211                }
212                let outage_id = compute_outage_id(outage);
213                let changes = map_outage(&context.source_id, &outage_id, outage, true);
214                total_events += send_changes(
215                    &context.source_id,
216                    &event_tx,
217                    &context.sequence_counter,
218                    changes,
219                    &mut sent_ids,
220                )
221                .await?;
222            }
223        }
224
225        if self.config.categories.bgp_hijacks
226            && query_requests_any(
227                &request,
228                &["BgpHijack", "AutonomousSystem", "Prefix"],
229                &["HIJACKED_BY", "VICTIM_ASN", "TARGETS_PREFIX"],
230            )
231        {
232            let asns = self.config.asn_filter.clone().unwrap_or_default();
233            let mut targets = vec![];
234            if asns.is_empty() {
235                targets.push(None);
236            } else {
237                for asn in asns {
238                    targets.push(Some(asn));
239                }
240            }
241
242            for target_asn in targets {
243                let mut params = vec![
244                    ("per_page", "50".to_string()),
245                    ("format", "json".to_string()),
246                ];
247                if let Some(asn) = target_asn {
248                    params.push(("involvedAsn", asn.to_string()));
249                }
250                if let Some(min_conf) = self.config.hijack_min_confidence {
251                    params.push(("minConfidence", min_conf.to_string()));
252                }
253                let url = build_url(
254                    &self.config.api_base_url,
255                    "/radar/bgp/hijacks/events",
256                    &params,
257                );
258                let result: BgpHijackResult = fetch_cloudflare(&self.client, &url).await?;
259                for event in result.events.iter() {
260                    let changes = map_hijack(&context.source_id, event, true);
261                    total_events += send_changes(
262                        &context.source_id,
263                        &event_tx,
264                        &context.sequence_counter,
265                        changes,
266                        &mut sent_ids,
267                    )
268                    .await?;
269                }
270            }
271        }
272
273        if self.config.categories.bgp_leaks
274            && query_requests_any(&request, &["BgpLeak", "AutonomousSystem"], &["LEAKED_BY"])
275        {
276            let asns = self.config.asn_filter.clone().unwrap_or_default();
277            let mut targets = vec![];
278            if asns.is_empty() {
279                targets.push(None);
280            } else {
281                for asn in asns {
282                    targets.push(Some(asn));
283                }
284            }
285
286            for target_asn in targets {
287                let mut params = vec![
288                    ("per_page", "50".to_string()),
289                    ("format", "json".to_string()),
290                ];
291                if let Some(asn) = target_asn {
292                    params.push(("involvedAsn", asn.to_string()));
293                }
294                let url = build_url(
295                    &self.config.api_base_url,
296                    "/radar/bgp/leaks/events",
297                    &params,
298                );
299                let result: BgpLeakResult = fetch_cloudflare(&self.client, &url).await?;
300                for event in result.events.iter() {
301                    let changes = map_leak(&context.source_id, event, true);
302                    total_events += send_changes(
303                        &context.source_id,
304                        &event_tx,
305                        &context.sequence_counter,
306                        changes,
307                        &mut sent_ids,
308                    )
309                    .await?;
310                }
311            }
312        }
313
314        if self.config.categories.http_traffic
315            && query_requests_any(&request, &["HttpTraffic"], &[])
316        {
317            let params = vec![
318                ("dateRange", self.config.analytics_date_range.clone()),
319                ("format", "json".to_string()),
320            ];
321            let url = build_url(
322                &self.config.api_base_url,
323                "/radar/http/summary/device_type",
324                &params,
325            );
326            let result: HttpSummaryResult = fetch_cloudflare(&self.client, &url).await?;
327            let effective_from = Utc::now().timestamp_millis() as u64;
328            for (series, summary) in result.summaries.iter() {
329                let node_id = format!("http-traffic-{}", normalize_id(series));
330                let change = map_http_summary(
331                    &context.source_id,
332                    &node_id,
333                    series,
334                    summary,
335                    effective_from,
336                );
337                total_events += send_changes(
338                    &context.source_id,
339                    &event_tx,
340                    &context.sequence_counter,
341                    vec![change],
342                    &mut sent_ids,
343                )
344                .await?;
345            }
346        }
347
348        if self.config.categories.attacks_l7 && query_requests_any(&request, &["AttackL7"], &[]) {
349            let params = vec![
350                ("dateRange", self.config.analytics_date_range.clone()),
351                ("format", "json".to_string()),
352            ];
353            let url = build_url(
354                &self.config.api_base_url,
355                "/radar/attacks/layer7/summary",
356                &params,
357            );
358            let result: AttackSummaryResult = fetch_cloudflare(&self.client, &url).await?;
359            let effective_from = Utc::now().timestamp_millis() as u64;
360            for (series, summary) in result.summaries.iter() {
361                let node_id = format!("attack-l7-{}", normalize_id(series));
362                let change = map_attack_l7_summary(
363                    &context.source_id,
364                    &node_id,
365                    series,
366                    summary,
367                    effective_from,
368                );
369                total_events += send_changes(
370                    &context.source_id,
371                    &event_tx,
372                    &context.sequence_counter,
373                    vec![change],
374                    &mut sent_ids,
375                )
376                .await?;
377            }
378        }
379
380        if self.config.categories.attacks_l3 && query_requests_any(&request, &["AttackL3"], &[]) {
381            let params = vec![
382                ("dateRange", self.config.analytics_date_range.clone()),
383                ("format", "json".to_string()),
384            ];
385            let url = build_url(
386                &self.config.api_base_url,
387                "/radar/attacks/layer3/summary",
388                &params,
389            );
390            let result: AttackLayer3SummaryResult = fetch_cloudflare(&self.client, &url).await?;
391            let effective_from = Utc::now().timestamp_millis() as u64;
392            for (series, summary) in result.summaries.iter() {
393                let node_id = format!("attack-l3-{}", normalize_id(series));
394                let change = map_attack_l3_summary(
395                    &context.source_id,
396                    &node_id,
397                    series,
398                    summary,
399                    effective_from,
400                );
401                total_events += send_changes(
402                    &context.source_id,
403                    &event_tx,
404                    &context.sequence_counter,
405                    vec![change],
406                    &mut sent_ids,
407                )
408                .await?;
409            }
410        }
411
412        if self.config.categories.domain_rankings
413            && query_requests_any(&request, &["Domain", "DomainRanking"], &["RANKED_AS"])
414        {
415            let params = vec![("limit", self.config.ranking_limit.to_string())];
416            let url = build_url(&self.config.api_base_url, "/radar/ranking/top", &params);
417            let result: RankingResult = fetch_cloudflare(&self.client, &url).await?;
418            let effective_from = Utc::now().timestamp_millis() as u64;
419            for ranks in result.rankings.values() {
420                for domain in ranks {
421                    let changes = map_domain_ranking(&context.source_id, domain, effective_from);
422                    total_events += send_changes(
423                        &context.source_id,
424                        &event_tx,
425                        &context.sequence_counter,
426                        changes,
427                        &mut sent_ids,
428                    )
429                    .await?;
430                }
431            }
432        }
433
434        if self.config.categories.dns && query_requests_any(&request, &["DnsQuerySummary"], &[]) {
435            let domains = self.config.dns_domains.clone().unwrap_or_default();
436            let effective_from = Utc::now().timestamp_millis() as u64;
437            for domain in domains {
438                let params = vec![
439                    ("domain", domain.clone()),
440                    ("dateRange", self.config.analytics_date_range.clone()),
441                    ("format", "json".to_string()),
442                    ("limit", "10".to_string()),
443                ];
444                let url = build_url(
445                    &self.config.api_base_url,
446                    "/radar/dns/top/locations",
447                    &params,
448                );
449                let result: DnsTopLocationsResult = fetch_cloudflare(&self.client, &url).await?;
450                if let Some(locations) = result.top.values().next() {
451                    let change =
452                        map_dns_summary(&context.source_id, &domain, locations, effective_from);
453                    total_events += send_changes(
454                        &context.source_id,
455                        &event_tx,
456                        &context.sequence_counter,
457                        vec![change],
458                        &mut sent_ids,
459                    )
460                    .await?;
461                }
462            }
463        }
464
465        info!(
466            "[{}] Cloudflare Radar bootstrap complete: {} events",
467            context.source_id, total_events
468        );
469
470        Ok(BootstrapResult {
471            event_count: total_events,
472            source_position: None,
473        })
474    }
475}
476
477fn build_client(api_token: &str) -> Result<Client> {
478    let mut headers = reqwest::header::HeaderMap::new();
479    headers.insert(
480        reqwest::header::AUTHORIZATION,
481        reqwest::header::HeaderValue::from_str(&format!("Bearer {api_token}"))?,
482    );
483
484    let client = Client::builder()
485        .default_headers(headers)
486        .timeout(std::time::Duration::from_secs(30))
487        .build()?;
488    Ok(client)
489}
490
491async fn fetch_cloudflare<T: DeserializeOwned>(client: &Client, url: &str) -> Result<T> {
492    let mut attempt = 0;
493    let mut delay = Duration::from_millis(500);
494    loop {
495        attempt += 1;
496        let response = match client.get(url).send().await {
497            Ok(response) => response,
498            Err(err) => {
499                if attempt >= 4 {
500                    return Err(err.into());
501                }
502                warn!("Cloudflare API request failed ({err}); retrying in {delay:?}");
503                sleep(delay).await;
504                delay *= 2;
505                continue;
506            }
507        };
508
509        if response.status().is_success() {
510            let payload: CloudflareResponse<T> = response.json().await?;
511            if payload.success {
512                return Ok(payload.result);
513            }
514            return Err(anyhow!("Cloudflare API returned success=false"));
515        }
516
517        if response.status().as_u16() == 429 || response.status().is_server_error() {
518            if attempt >= 4 {
519                return Err(anyhow!(
520                    "Cloudflare API error after retries: {}",
521                    response.status()
522                ));
523            }
524            warn!(
525                "Cloudflare API rate limited or server error ({}); retrying in {:?}",
526                response.status(),
527                delay
528            );
529            sleep(delay).await;
530            delay *= 2;
531            continue;
532        }
533        return Err(anyhow!(
534            "Cloudflare API request failed: {}",
535            response.status()
536        ));
537    }
538}
539
540fn build_url(base: &str, path: &str, params: &[(&str, String)]) -> String {
541    if params.is_empty() {
542        format!("{base}{path}")
543    } else {
544        let mut serializer = url::form_urlencoded::Serializer::new(String::new());
545        for (key, value) in params {
546            serializer.append_pair(key, value);
547        }
548        let query = serializer.finish();
549        format!("{base}{path}?{query}")
550    }
551}
552
553fn query_requests_any(
554    request: &BootstrapRequest,
555    node_labels: &[&str],
556    relation_labels: &[&str],
557) -> bool {
558    if request.node_labels.is_empty() && request.relation_labels.is_empty() {
559        return true;
560    }
561    node_labels
562        .iter()
563        .any(|label| request.node_labels.contains(&label.to_string()))
564        || relation_labels
565            .iter()
566            .any(|label| request.relation_labels.contains(&label.to_string()))
567}
568
569async fn send_changes(
570    source_id: &str,
571    event_tx: &BootstrapEventSender,
572    sequence_counter: &std::sync::Arc<std::sync::atomic::AtomicU64>,
573    changes: Vec<drasi_core::models::SourceChange>,
574    sent_ids: &mut HashSet<String>,
575) -> Result<usize> {
576    let mut count = 0usize;
577    for change in changes {
578        let reference = change.get_reference();
579        let key = format!("{}:{}", source_id, reference.element_id);
580        if !sent_ids.insert(key) {
581            continue;
582        }
583        let sequence = sequence_counter.fetch_add(1, Ordering::SeqCst);
584        let event = BootstrapEvent {
585            source_id: source_id.to_string(),
586            change,
587            timestamp: Utc::now(),
588            sequence,
589        };
590        event_tx
591            .send(event)
592            .await
593            .map_err(|err| anyhow!(err.to_string()))?;
594        count += 1;
595    }
596    Ok(count)
597}
598
599fn compute_outage_id(outage: &crate::api::OutageAnnotation) -> String {
600    let start = outage
601        .start_date
602        .clone()
603        .unwrap_or_else(|| "unknown".to_string());
604    let scope = outage.scope.clone().unwrap_or_default();
605    let mut locations = outage.locations.clone();
606    locations.sort();
607    let key = format!("{start}-{scope}-{}", locations.join(","));
608    format!("outage-{}", normalize_id(&key))
609}
610
611fn outage_matches_filters(
612    outage: &crate::api::OutageAnnotation,
613    location_filter: Option<&Vec<String>>,
614    asn_filter: Option<&Vec<u32>>,
615) -> bool {
616    if let Some(locations) = location_filter {
617        if !locations.is_empty()
618            && !outage
619                .locations
620                .iter()
621                .any(|location| locations.contains(location))
622        {
623            return false;
624        }
625    }
626
627    if let Some(asns) = asn_filter {
628        if !asns.is_empty() && !outage.asns.iter().any(|asn| asns.contains(asn)) {
629            return false;
630        }
631    }
632
633    true
634}
635
636/// Dynamic plugin entry point.
637#[cfg(feature = "dynamic-plugin")]
638drasi_plugin_sdk::export_plugin!(
639    plugin_id = "cloudflare-radar-bootstrap",
640    core_version = env!("CARGO_PKG_VERSION"),
641    lib_version = env!("CARGO_PKG_VERSION"),
642    plugin_version = env!("CARGO_PKG_VERSION"),
643    source_descriptors = [],
644    reaction_descriptors = [],
645    bootstrap_descriptors = [descriptor::CloudflareRadarBootstrapDescriptor],
646);