1use std::{cmp, fmt, fs, thread};
22use std::borrow::Cow;
23use std::collections::HashMap;
24use std::fs::File;
25use std::path::{Path, PathBuf};
26use std::sync::{Arc, Mutex};
27use std::sync::atomic::{AtomicBool, Ordering};
28use bytes::Bytes;
29use crossbeam_queue::{ArrayQueue, SegQueue};
30use log::{debug, error, info, warn};
31use rand::seq::SliceRandom;
32use rpki::crypto::keys::KeyIdentifier;
33#[allow(unused_imports)]
34use rpki::repository::aspa::{Aspa, AsProviderAttestation};
35use rpki::repository::cert::{Cert, KeyUsage, ResourceCert};
36use rpki::repository::crl::Crl;
37use rpki::repository::error::{InspectionError, ValidationError};
38use rpki::repository::manifest::{Manifest, ManifestContent, ManifestHash};
39use rpki::repository::roa::{Roa, RouteOriginAttestation};
40use rpki::repository::sigobj::SignedObject;
41use rpki::repository::tal::{Tal, TalInfo, TalUri};
42use rpki::repository::x509::{Time, Validity};
43use rpki::uri;
44use crate::{collector, store, tals};
45use crate::config::{Config, FilterPolicy};
46use crate::collector::Collector;
47use crate::error::{Failed, Fatal, RunFailed};
48use crate::log::{LogBook, LogBookWriter};
49use crate::metrics::{
50 Metrics, PublicationMetrics, RepositoryMetrics, TalMetrics
51};
52use crate::store::{
53 Store, StoredManifest, StoredObject, StoredPoint, StoredStatus
54};
55use crate::utils::str::str_from_ascii;
56
57
58const CRL_CACHE_LIMIT: usize = 50;
65
66
67#[derive(Debug)]
87pub struct Engine {
88 bundled_tals: Vec<Tal>,
90
91 extra_tals_dir: Option<PathBuf>,
93
94 tal_labels: HashMap<String, String>,
96
97 tals: Vec<Tal>,
99
100 collector: Option<Collector>,
104
105 store: Store,
107
108 strict: bool,
110
111 stale: FilterPolicy,
113
114 validation_threads: usize,
116
117 dirty_repository: bool,
119
120 max_ca_depth: usize,
122
123 log_repository_issues: bool,
125}
126
127impl Engine {
128 pub fn init(config: &Config) -> Result<(), Failed> {
135 Collector::init(config)?;
136 Store::init(config)?;
137 Ok(())
138 }
139
140 pub fn new(
148 config: &Config,
149 update: bool,
150 ) -> Result<Self, Failed> {
151 let collector = if update {
152 Some(Collector::new(config)?)
153 }
154 else {
155 None
156 };
157 let store = Store::new(config)?;
158 let mut res = Engine {
159 bundled_tals: tals::collect_tals(config)?,
160 extra_tals_dir: config.extra_tals_dir.clone(),
161 tal_labels: config.tal_labels.clone(),
162 tals: Vec::new(),
163 collector,
164 store,
165 strict: config.strict,
166 stale: config.stale,
167 validation_threads: config.validation_threads,
168 dirty_repository: config.dirty_repository,
169 max_ca_depth: config.max_ca_depth,
170 log_repository_issues: config.log_repository_issues,
171 };
172 res.reload_tals()?;
173 Ok(res)
174 }
175
176 pub fn disable_collector(&mut self) {
180 self.collector = None;
181 }
182
183 pub fn store_status(&self) -> Result<Option<StoredStatus>, Failed> {
185 self.store.status()
186 }
187
188 pub fn reload_tals(&mut self) -> Result<(), Failed> {
198 let mut res = self.bundled_tals.clone();
199 if let Some(extra_tals_dir) = self.extra_tals_dir.as_ref() {
200 let dir = match fs::read_dir(extra_tals_dir) {
201 Ok(dir) => dir,
202 Err(err) => {
203 error!("Failed to open TAL directory {}: {}.",
204 extra_tals_dir.display(), err
205 );
206 return Err(Failed)
207 }
208 };
209 for entry in dir {
210 let entry = match entry {
211 Ok(entry) => entry,
212 Err(err) => {
213 error!(
214 "Failed to iterate over tal directory: {err}"
215 );
216 return Err(Failed)
217 }
218 };
219
220 if !entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
221 continue
222 }
223
224 let path = entry.path();
225 if path.extension().map(|ext| ext != "tal").unwrap_or(true) {
226 continue
227 }
228
229 let mut file = match File::open(&path) {
230 Ok(file) => {
231 file
232 }
233 Err(err) => {
234 error!(
235 "Failed to open TAL {}: {}. \n\
236 Aborting.",
237 path.display(), err
238 );
239 return Err(Failed)
240 }
241 };
242 let mut tal = match Tal::read_named(
243 self.path_to_tal_label(&path),
244 &mut file
245 ) {
246 Ok(tal) => tal,
247 Err(err) => {
248 error!(
249 "Failed to read TAL {}: {}. \n\
250 Aborting.",
251 path.display(), err
252 );
253 return Err(Failed)
254 }
255 };
256 tal.prefer_https();
257 res.push(tal);
258 }
259 }
260 if res.is_empty() {
261 warn!(
262 "No TALs provided. Starting anyway."
263 );
264 }
265 res.sort_by(|left, right| {
266 left.info().name().cmp(right.info().name())
267 });
268 self.tals = res;
269
270 Ok(())
271 }
272
273 fn path_to_tal_label(&self, path: &Path) -> String {
279 if let Some(name) = path.file_name().unwrap().to_str() {
280 if let Some(label) = self.tal_labels.get(name) {
281 return label.clone()
282 }
283 }
284 path.file_stem().unwrap().to_string_lossy().into_owned()
285 }
286
287 pub fn ignite(&mut self) -> Result<(), Failed> {
292 if let Some(collector) = self.collector.as_mut() {
293 collector.ignite()?;
294 }
295 Ok(())
296 }
297
298 pub fn sanitize(&self) -> Result<(), Fatal> {
303 self.store.sanitize()?;
304 if let Some(collector) = self.collector.as_ref() {
305 collector.sanitize()?;
306 }
307 Ok(())
308 }
309
310 pub fn start<P: ProcessRun>(
323 &self, processor: P, initial: bool,
324 ) -> Result<Run<'_, P>, Failed> {
325 info!("Using the following TALs:");
326 for tal in &self.tals {
327 info!(" * {}", tal.info().name());
328 }
329 Ok(Run::new(
330 self,
331 if initial {
332 None
333 }
334 else {
335 self.collector.as_ref().map(Collector::start)
336 },
337 self.store.start(),
338 processor,
339 initial,
340 ))
341 }
342
343 pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
345 self.store.dump(dir)?;
346 if let Some(collector) = self.collector.as_ref() {
347 collector.dump(dir)?;
348 }
349 Ok(())
350 }
351}
352
353
354pub struct Run<'a, P> {
363 validation: &'a Engine,
365
366 collector: Option<collector::Run<'a>>,
368
369 store: store::Run<'a>,
371
372 processor: P,
374
375 initial: bool,
379
380 had_err: AtomicBool,
382
383 is_fatal: AtomicBool,
385
386 metrics: Metrics,
388}
389
390impl<'a, P> Run<'a, P> {
391 fn new(
393 validation: &'a Engine,
394 collector: Option<collector::Run<'a>>,
395 store: store::Run<'a>,
396 processor: P,
397 initial: bool
398 ) -> Self {
399 Run {
400 validation, collector, store, processor, initial,
401 had_err: AtomicBool::new(false),
402 is_fatal: AtomicBool::new(false),
403 metrics: Default::default()
404 }
405 }
406
407 pub fn cleanup(&mut self) -> Result<(), Failed> {
409 if self.validation.dirty_repository {
410 debug!("Skipping cleanup as configured.");
411 return Ok(())
412 }
413
414 let mut retain = collector::Cleanup::new();
415 self.store.cleanup(&mut retain)?;
416 if let Some(collector) = self.collector.as_mut() {
417 collector.cleanup(&mut retain)?;
418 }
419 Ok(())
420 }
421
422 pub fn done(self) -> Metrics {
427 let mut metrics = self.metrics;
428 if let Some(collector) = self.collector {
429 collector.done(&mut metrics)
430 }
431 self.store.done(&mut metrics);
432 metrics
433 }
434}
435
436impl<P: ProcessRun> Run<'_, P> {
437 pub fn process(&mut self) -> Result<(), RunFailed> {
439 if self.validation.tals.is_empty() {
441 return Ok(())
442 }
443
444 let metrics = RunMetrics::default();
446 let tasks = SegQueue::new();
447 for (index, tal) in self.validation.tals.iter().enumerate() {
448 tasks.push(Task::Tal(TalTask { tal, index }));
449 self.metrics.tals.push(TalMetrics::new(tal.info().clone()));
450 }
451
452 let thread_metrics = ArrayQueue::new(
456 self.validation.validation_threads
457 );
458 thread::scope(|scope| {
459 for _ in 0 .. self.validation.validation_threads {
460 scope.spawn(|| {
461 let mut metrics = metrics.fork();
462 while let Some(task) = tasks.pop() {
463 if self.process_task(
464 task, &tasks, &mut metrics,
465 ).is_err() {
466 break;
467 }
468 }
469 thread_metrics.push(metrics).unwrap();
470 });
471 }
472 });
473
474 if self.had_err.load(Ordering::Relaxed) {
475 if self.is_fatal.load(Ordering::Relaxed) {
476 return Err(RunFailed::fatal())
477 }
478 else {
479 return Err(RunFailed::retry())
480 }
481 }
482
483 metrics.prepare_final(&mut self.metrics);
484 while let Some(metrics) = thread_metrics.pop() {
485 metrics.collapse(&mut self.metrics);
486 }
487
488 Ok(())
489 }
490
491 fn process_task(
493 &self,
494 task: Task<P::PubPoint>,
495 tasks: &SegQueue<Task<P::PubPoint>>,
496 metrics: &mut RunMetrics,
497 ) -> Result<(), Failed> {
498 match task {
499 Task::Tal(task) => {
500 self.process_tal_task(task, tasks, metrics)
501 }
502 Task::Ca(task) => {
503 self.process_ca_task(task, tasks, metrics)
504 }
505 }
506 }
507
508 fn process_tal_task(
510 &self, task: TalTask,
511 tasks: &SegQueue<Task<P::PubPoint>>,
512 metrics: &mut RunMetrics,
513 ) -> Result<(), Failed> {
514 for uri in task.tal.uris() {
515 let cert = match self.load_ta(uri, task.tal.info())? {
516 Some(cert) => cert,
517 _ => continue,
518 };
519 if cert.subject_public_key_info() != task.tal.key_info() {
520 warn!(
521 "Trust anchor {uri}: key doesn’t match TAL."
522 );
523 continue;
524 }
525 let cert = match cert.validate_ta(
526 task.tal.info().clone(), self.validation.strict
527 ) {
528 Ok(cert) => CaCert::root(cert, uri.clone(), task.index),
529 Err(err) => {
530 warn!("Trust anchor {uri}: {err}.");
531 continue;
532 }
533 };
534 let cert = match cert {
535 Ok(cert) => cert,
536 Err(_) => continue,
537 };
538 debug!("Found valid trust anchor {uri}. Processing.");
539
540 match self.processor.process_ta(
541 task.tal, uri, &cert, cert.tal
542 )? {
543 Some(processor) => {
544 return self.process_ca_task(
545 CaTask {
546 cert, processor,
547 repository_index: None,
548 defer: false,
549 },
550 tasks, metrics,
551 )
552 }
553 None => {
554 debug!("Skipping trust anchor {uri}.");
555 return Ok(())
556 }
557 }
558 }
559 if self.initial {
560 info!(
561 "Initial quick validation failed: \
562 no trust anchor for TAL {}.",
563 task.tal.info().name()
564 );
565 self.run_failed(RunFailed::retry());
566 Err(Failed)
567 }
568 else {
569 warn!("No valid trust anchor for TAL {}", task.tal.info().name());
570 Ok(())
571 }
572 }
573
574 fn load_ta(
579 &self,
580 uri: &TalUri,
581 _info: &TalInfo,
582 ) -> Result<Option<Cert>, Failed> {
583 if let Some(collector) = self.collector.as_ref() {
585 if let Some(bytes) = collector.load_ta(uri) {
586 if let Ok(cert) = Cert::decode(bytes.clone()) {
587 self.store.update_ta(uri, &bytes)?;
588 return Ok(Some(cert))
589 }
590 }
591 }
592
593 self.store.load_ta(uri).map(|bytes| {
595 bytes.and_then(|bytes| Cert::decode(bytes).ok())
596 })
597 }
598
599 fn process_ca_task(
601 &self,
602 task: CaTask<P::PubPoint>,
603 tasks: &SegQueue<Task<P::PubPoint>>,
604 metrics: &mut RunMetrics,
605 ) -> Result<(), Failed> {
606 let more_tasks = PubPoint::new(
607 self, &task.cert, task.processor, task.repository_index,
608 ).and_then(|point| {
609 point.process(metrics)
610 }).map_err(|err| {
611 self.run_failed(err);
612 Failed
613 })?;
614 for task in more_tasks {
615 if self.had_err.load(Ordering::Relaxed) {
616 return Err(Failed)
617 }
618 if task.defer {
619 tasks.push(Task::Ca(task))
620 }
621 else {
622 self.process_ca_task(task, tasks, metrics)?;
623 }
624 }
625 Ok(())
626 }
627
628 fn run_failed(&self, err: RunFailed) {
630 self.had_err.store(true, Ordering::Relaxed);
631 if err.is_fatal() {
632 self.is_fatal.store(true, Ordering::Relaxed);
633 }
634 }
635}
636
637
638struct PubPoint<'a, P: ProcessRun> {
642 run: &'a Run<'a, P>,
644
645 cert: &'a Arc<CaCert>,
647
648 processor: P::PubPoint,
650
651 repository_index: Option<usize>,
653
654 metrics: PublicationMetrics,
661
662 log: LogBookWriter,
664}
665
666impl<'a, P: ProcessRun> PubPoint<'a, P> {
667 pub fn new(
669 run: &'a Run<'a, P>,
670 cert: &'a Arc<CaCert>,
671 processor: P::PubPoint,
672 repository_index: Option<usize>,
673 ) -> Result<Self, RunFailed> {
674 Ok(PubPoint {
675 run, cert, processor, repository_index,
676 metrics: Default::default(),
677 log: LogBookWriter::new(
678 run.validation.log_repository_issues.then(|| {
679 format!("{}: ", cert.ca_repository())
680 })
681 ),
682 })
683 }
684
685 pub fn process(
690 self,
691 metrics: &mut RunMetrics,
692 ) -> Result<Vec<CaTask<P::PubPoint>>, RunFailed> {
693 let mut store = self.run.store.pub_point(self.cert)?;
694 if self.run.initial && store.is_new() {
695 info!(
696 "Initial quick validation failed: \
697 encountered new publication point '{}'.",
698 self.cert.rpki_manifest()
699 );
700 self.run.run_failed(RunFailed::retry());
701 return Err(RunFailed::retry());
702 }
703 if let Some(collector) = self.run.collector.as_ref() {
704 if let Some(collector) = collector.repository(self.cert)? {
705 match self.process_collected(
706 collector, &mut store, metrics
707 )? {
708 Ok(res) => return Ok(res),
709 Err(mut this) => {
710 this.metrics = Default::default();
711 return Ok(this.process_stored(store, metrics)?)
712 }
713 }
714 }
715 }
716 Ok(self.process_stored(store, metrics)?)
717 }
718
719 #[allow(clippy::type_complexity)] fn process_collected(
732 mut self,
733 collector: collector::Repository,
734 store: &mut StoredPoint,
735 metrics: &mut RunMetrics,
736 ) -> Result<Result<Vec<CaTask<P::PubPoint>>, Self>, RunFailed> {
737 let collected = match collector.load_object(
740 self.cert.rpki_manifest()
741 )? {
742 Some(collected) => collected,
743 None => return Ok(Err(self))
744 };
745
746 let same = if let Some(mft) = store.manifest() {
751 mft.manifest == collected
752 && mft.ca_repository == *self.cert.ca_repository()
753 }
754 else {
755 false
756 };
757 if same {
758 return Ok(Err(self))
759 }
760
761 let mut collected = match self.validate_collected_manifest(
764 collected, &collector
765 )? {
766 Some(collected) => collected,
767 None => {
768 return Ok(Err(self))
769 }
770 };
771
772 if !self.check_collected_is_newer(&collected, store)? {
775 return Ok(Err(self))
776 }
777
778 collected.point_validity(&mut self.processor);
782
783 let mut ca_tasks = Vec::new();
797
798 let mut items_random: Vec<_> = collected.content.iter().collect();
803 items_random.shuffle(&mut rand::rng());
804 let mut items = items_random.into_iter();
805
806 let mut point_ok = true;
807 let update_result = store.update(
808 &self.run.validation.store,
809 StoredManifest::new(
810 &collected.ee_cert,
811 &collected.content,
812 self.cert,
813 collected.manifest_bytes.clone(),
814 collected.crl_uri.clone(),
815 collected.crl_bytes.clone(),
816 ),
817 || {
818 let item = match items.next() {
819 Some(item) => item,
820 None => return Ok(None)
821 };
822
823 let file = match str_from_ascii(item.file()) {
824 Ok(file) => file,
825 Err(_) => {
826 self.log.warn(format_args!(
827 "manifest {} contains illegal file name '{}'.",
828 self.cert.rpki_manifest(),
829 String::from_utf8_lossy(item.file())
830 ));
831 return Err(store::UpdateError::Abort)
832 }
833 };
834 let uri = self.cert.ca_repository().join(
835 file.as_ref()
836 ).unwrap();
837
838 let hash = ManifestHash::new(
839 item.hash().clone(), collected.content.file_hash_alg()
840 );
841
842 let content = match collector.load_object(&uri)? {
843 Some(content) => content,
844 None => {
845 self.log.warn(format_args!(
846 "{uri}: failed to load."
847 ));
848 return Err(store::UpdateError::Abort)
849 }
850 };
851
852 if hash.verify(&content).is_err() {
853 self.log.warn(format_args!(
854 "{uri}: file has wrong manifest hash."
855 ));
856 return Err(store::UpdateError::Abort)
857 }
858
859 if !self.process_object(
860 &uri, content.clone(),
861 &mut collected, &mut ca_tasks
862 )? {
863 point_ok = false;
864 }
865
866 Ok(Some(StoredObject::new(uri, content, Some(hash))))
867 }
868 );
869
870 match update_result {
871 Ok(()) => {
872 if point_ok {
875 self.accept_point(collected, metrics);
876 Ok(Ok(ca_tasks))
877 }
878 else {
879 self.reject_point(metrics);
880 Ok(Ok(Vec::new()))
881 }
882 }
883 Err(store::UpdateError::Abort) => {
884 Ok(Err(self))
886 }
887 Err(store::UpdateError::Failed(err)) => {
888 Err(err)
890 }
891 }
892 }
893
894 fn validate_collected_manifest(
901 &mut self,
902 manifest_bytes: Bytes,
903 repository: &collector::Repository,
904 ) -> Result<Option<ValidPointManifest>, RunFailed> {
905 let manifest = match Manifest::decode(
906 manifest_bytes.clone(), self.run.validation.strict
907 ) {
908 Ok(manifest) => manifest,
909 Err(_) => {
910 self.metrics.invalid_manifests += 1;
911 self.log.warn(format_args!(
912 "failed to decode manifest {}.",
913 self.cert.rpki_manifest()
914 ));
915 return Ok(None)
916 }
917 };
918 let (ee_cert, content) = match manifest.validate(
919 self.cert.cert(), self.run.validation.strict
920 ) {
921 Ok(some) => some,
922 Err(err) => {
923 self.metrics.invalid_manifests += 1;
924 self.log.warn(format_args!(
925 "manifest {}: {}.", self.cert.rpki_manifest(), err
926 ));
927 return Ok(None)
928 }
929 };
930
931 if content.this_update() > Time::now() {
932 self.metrics.premature_manifests += 1;
933 self.log.warn(format_args!(
934 "premature manifest {}", self.cert.rpki_manifest()
935 ));
936 return Ok(None)
937 }
938
939 if content.is_stale() {
940 self.metrics.stale_manifests += 1;
941 match self.run.validation.stale {
942 FilterPolicy::Reject => {
943 self.log.warn(format_args!(
944 "rejecting stale manifest {}",
945 self.cert.rpki_manifest()
946 ));
947 return Ok(None)
948 }
949 FilterPolicy::Warn => {
950 self.log.warn(format_args!(
951 "stale manifest {}", self.cert.rpki_manifest()
952 ));
953 }
954 FilterPolicy::Accept => { }
955 }
956 }
957
958 let (crl_uri, crl, crl_bytes) = match self.validate_collected_crl(
959 &ee_cert, &content, repository
960 )? {
961 Some(some) => some,
962 None => return Ok(None)
963 };
964
965 self.metrics.valid_manifests += 1;
966
967 Ok(Some(ValidPointManifest {
968 ee_cert, content, crl_uri, crl, manifest_bytes, crl_bytes,
969 metrics: Default::default(),
970 }))
971 }
972
973 fn check_collected_is_newer(
983 &mut self,
984 collected: &ValidPointManifest,
985 stored: &mut StoredPoint,
986 ) -> Result<bool, Failed> {
987 let Some(stored_mft) = stored.manifest() else {
988 return Ok(true);
989 };
990
991 if collected.content.manifest_number() > stored_mft.manifest_number
994 && collected.content.this_update() > stored_mft.this_update
995 {
996 return Ok(true);
997 }
998
999 if let Ok(mft) = Manifest::decode(
1001 stored_mft.manifest.clone(), self.run.validation.strict
1002 ) {
1003 if mft.content().manifest_number() == stored_mft.manifest_number
1004 && mft.content().this_update() == stored_mft.this_update
1005 {
1006 if collected.content.manifest_number()
1011 <= stored_mft.manifest_number
1012 {
1013 self.log.warn(format_args!(
1014 "manifest {}: manifest number is not greater than in \
1015 stored version. Using stored publication point.",
1016 self.cert.rpki_manifest(),
1017 ));
1018 return Ok(false);
1019 }
1020 if collected.content.this_update()
1021 <= stored_mft.this_update
1022 {
1023 self.log.warn(format_args!(
1024 "manifest {}: manifest thisUpdate is not later than in \
1025 stored version. Using stored publication point.",
1026 self.cert.rpki_manifest(),
1027 ));
1028 return Ok(false);
1029 }
1030 }
1031 }
1032
1033 stored.reject()?;
1036 Ok(true)
1037 }
1038
1039 fn validate_collected_crl(
1048 &mut self,
1049 ee_cert: &ResourceCert,
1050 manifest: &ManifestContent,
1051 repository: &collector::Repository
1052 ) -> Result<Option<(uri::Rsync, Crl, Bytes)>, RunFailed> {
1053 let crl_uri = match ee_cert.crl_uri() {
1056 Some(some) if some.ends_with(".crl") => some.clone(),
1058 Some(some) => {
1059 self.metrics.invalid_manifests += 1;
1060 self.log.warn(format_args!("invalid CRL URI {}", some));
1061 return Ok(None)
1062 }
1063 None => {
1064 self.metrics.invalid_manifests += 1;
1065 self.log.warn(format_args!(
1066 "missing CRL URI on manifest {}",
1067 self.cert.rpki_manifest()
1068 ));
1069 return Ok(None)
1070 }
1071 };
1072 let crl_name = match crl_uri.relative_to(self.cert.ca_repository()) {
1073 Some(name) => name,
1074 None => {
1075 self.metrics.invalid_manifests += 1;
1076 self.log.warn(format_args!(
1077 "CRL URI {crl_uri} outside repository directory."
1078 ));
1079 return Ok(None)
1080 }
1081 };
1082
1083 let mut crl_bytes = None;
1086 for item in manifest.iter() {
1087 let (file, hash) = item.into_pair();
1088 if file == crl_name {
1089 let bytes = match repository.load_object(&crl_uri)? {
1090 Some(bytes) => bytes,
1091 None => {
1092 self.metrics.invalid_crls += 1;
1093 self.log.warn(format_args!(
1094 "{crl_uri}: failed to load."
1095 ));
1096 return Ok(None)
1097 }
1098 };
1099 let hash = ManifestHash::new(hash, manifest.file_hash_alg());
1100 if hash.verify(&bytes).is_err() {
1101 self.metrics.invalid_crls += 1;
1102 self.log.warn(format_args!(
1103 "file {crl_uri} has wrong hash."
1104 ));
1105 return Ok(None)
1106 }
1107 crl_bytes = Some(bytes);
1108 }
1109 }
1110 let crl_bytes = match crl_bytes {
1111 Some(some) => some,
1112 None => {
1113 self.metrics.invalid_crls += 1;
1114 self.log.warn(format_args!("CRL not listed on manifest."));
1115 return Ok(None)
1116 }
1117 };
1118
1119 let mut crl = match Crl::decode(crl_bytes.clone()) {
1121 Ok(crl) => crl,
1122 Err(_) => {
1123 self.metrics.invalid_crls += 1;
1124 self.log.warn(format_args!(
1125 "CRL {crl_uri}: failed to decode."
1126 ));
1127 return Ok(None)
1128 }
1129 };
1130 if let Err(err) = crl.verify_signature(
1131 self.cert.cert().subject_public_key_info()
1132 ) {
1133 self.metrics.invalid_crls += 1;
1134 self.log.warn(format_args!("CRL {crl_uri}: {err}."));
1135 return Ok(None)
1136 }
1137 if crl.is_stale() {
1138 self.metrics.stale_crls += 1;
1139 match self.run.validation.stale {
1140 FilterPolicy::Reject => {
1141 self.log.warn(format_args!(
1142 "rejecting stale CRL {crl_uri}."
1143 ));
1144 return Ok(None)
1145 }
1146 FilterPolicy::Warn => {
1147 self.log.warn(format_args!("stale CRL {crl_uri}."));
1148 }
1149 FilterPolicy::Accept => { }
1150 }
1151 }
1152
1153 if manifest.len() > CRL_CACHE_LIMIT {
1155 crl.cache_serials()
1156 }
1157
1158 if crl.contains(ee_cert.serial_number()) {
1160 self.metrics.invalid_manifests += 1;
1161 self.log.warn(format_args!(
1162 "manifest certificate has been revoked."
1163 ));
1164 return Ok(None)
1165 }
1166
1167 self.metrics.valid_crls += 1;
1169 Ok(Some((crl_uri, crl, crl_bytes)))
1170 }
1171
1172 fn process_stored(
1179 mut self,
1180 mut store: StoredPoint,
1181 metrics: &mut RunMetrics,
1182 ) -> Result<Vec<CaTask<P::PubPoint>>, Failed> {
1183 let manifest = match store.manifest() {
1184 Some(manifest) => manifest,
1185 None => {
1186 self.log.warn(format_args!(
1189 "no valid manifest {} found.",
1190 self.cert.rpki_manifest()
1191 ));
1192 self.metrics.missing_manifests += 1;
1193 self.reject_point(metrics);
1194 return Ok(Vec::new())
1195 }
1196 };
1197
1198 let mut manifest = match self.validate_stored_manifest(manifest) {
1199 Ok(manifest) => manifest,
1200 Err(_) => {
1201 self.reject_point(metrics);
1202 return Ok(Vec::new())
1203 }
1204 };
1205
1206 manifest.point_validity(&mut self.processor);
1207
1208 let mut ca_tasks = Vec::new();
1209 for object in &mut store {
1210 let object = match object {
1211 Ok(object) => object,
1212 Err(err) => {
1213 if err.is_fatal() {
1214 error!(
1215 "Fatal: failed to read from {}: {}",
1216 store.path().display(), err
1217 );
1218 return Err(Failed)
1219 }
1220 else {
1221 debug!(
1222 "Ignoring invalid stored publication point \
1223 at {}: {}",
1224 store.path().display(), err
1225 );
1226 self.reject_point(metrics);
1227 return Ok(Vec::new())
1228 }
1229 }
1230 };
1231 if !self.process_object(
1232 &object.uri, object.content.clone(),
1233 &mut manifest, &mut ca_tasks
1234 )? {
1235 self.reject_point(metrics);
1236 return Ok(Vec::new())
1237 }
1238 }
1239
1240 self.accept_point(manifest, metrics);
1241 Ok(ca_tasks)
1242 }
1243
1244 fn validate_stored_manifest(
1251 &mut self,
1252 stored_manifest: &StoredManifest,
1253 ) -> Result<ValidPointManifest, Failed> {
1254 let manifest = match Manifest::decode(
1256 stored_manifest.manifest.clone(), self.run.validation.strict
1257 ) {
1258 Ok(manifest) => manifest,
1259 Err(_) => {
1260 self.metrics.invalid_manifests += 1;
1261 self.log.warn(format_args!(
1262 "failed to decode manifest {}.",
1263 self.cert.rpki_manifest(),
1264 ));
1265 return Err(Failed);
1266 }
1267 };
1268 let (ee_cert, content) = match manifest.validate(
1269 self.cert.cert(), self.run.validation.strict
1270 ) {
1271 Ok(some) => some,
1272 Err(err) => {
1273 self.log.warn(format_args!(
1274 "manifest {}: {}.",
1275 self.cert.rpki_manifest(), err
1276 ));
1277 self.metrics.invalid_manifests += 1;
1278 return Err(Failed);
1279 }
1280 };
1281 if content.is_stale() {
1282 self.metrics.stale_manifests += 1;
1283 match self.run.validation.stale {
1284 FilterPolicy::Reject => {
1285 self.log.warn(format_args!(
1286 "rejecting stale manifest {}",
1287 self.cert.rpki_manifest()
1288 ));
1289 self.metrics.invalid_manifests += 1;
1290 return Err(Failed);
1291 }
1292 FilterPolicy::Warn => {
1293 self.log.warn(format_args!(
1294 "stale manifest {}", self.cert.rpki_manifest()
1295 ));
1296 }
1297 FilterPolicy::Accept => { }
1298 }
1299 }
1300
1301 let crl_uri = match ee_cert.crl_uri() {
1303 Some(uri) => uri.clone(),
1304 None => {
1305 self.log.warn(format_args!("manifest without CRL URI."));
1307 self.metrics.invalid_manifests += 1;
1308 return Err(Failed)
1309 }
1310 };
1311
1312 let mut crl = match Crl::decode(stored_manifest.crl.clone()) {
1314 Ok(crl) => crl,
1315 Err(_) => {
1316 self.metrics.invalid_manifests += 1;
1317 self.metrics.invalid_crls += 1;
1318 self.log.warn(format_args!(
1319 "failed to decode CRL {crl_uri}."
1320 ));
1321 return Err(Failed)
1322 }
1323 };
1324 if let Err(err) = crl.verify_signature(
1325 self.cert.cert().subject_public_key_info()
1326 ) {
1327 self.log.warn(format_args!("CRL {crl_uri}: {err}."));
1328 self.metrics.invalid_manifests += 1;
1329 self.metrics.invalid_crls += 1;
1330 return Err(Failed)
1331 }
1332 if crl.is_stale() {
1333 self.metrics.stale_crls += 1;
1334 match self.run.validation.stale {
1335 FilterPolicy::Reject => {
1336 self.log.warn(format_args!(
1337 "rejecting stale CRL {crl_uri}."
1338 ));
1339 self.metrics.invalid_manifests += 1;
1340 self.metrics.invalid_crls += 1;
1341 return Err(Failed)
1342 }
1343 FilterPolicy::Warn => {
1344 self.log.warn(format_args!(
1345 "stale CRL {crl_uri}."
1346 ));
1347 }
1348 FilterPolicy::Accept => { }
1349 }
1350 }
1351
1352 if content.len() > CRL_CACHE_LIMIT {
1354 crl.cache_serials()
1355 }
1356
1357 if crl.contains(ee_cert.serial_number()) {
1362 self.log.warn(format_args!(
1363 "manifest {}: certificate has been revoked.",
1364 self.cert.rpki_manifest()
1365 ));
1366 self.metrics.invalid_manifests += 1;
1367 return Err(Failed)
1368 }
1369
1370 self.metrics.valid_manifests += 1;
1371 self.metrics.valid_crls += 1;
1372 Ok(ValidPointManifest {
1373 ee_cert, content, crl_uri, crl,
1374 manifest_bytes: stored_manifest.manifest.clone(),
1375 crl_bytes: stored_manifest.crl.clone(),
1376 metrics: Default::default(),
1377 })
1378 }
1379
1380 fn accept_point(
1383 mut self,
1384 manifest: ValidPointManifest,
1385 metrics: &mut RunMetrics,
1386 ) {
1387 self.metrics.valid_points += 1;
1388 self.metrics += manifest.metrics;
1389 self.apply_metrics(metrics);
1390 self.processor.commit();
1391 }
1392
1393 fn reject_point(
1394 mut self,
1395 metrics: &mut RunMetrics,
1396 ) {
1397 self.metrics.rejected_points += 1;
1398 self.apply_metrics(metrics);
1399 self.processor.cancel(self.cert);
1400 let log = self.log.into_book();
1401 if !log.is_empty() {
1402 metrics.append_log(self.cert.ca_repository().clone(), log);
1403 }
1404 }
1405
1406 fn apply_metrics(
1407 &mut self,
1408 metrics: &mut RunMetrics,
1409 ) {
1410 let repository_index = self.repository_index.unwrap_or_else(|| {
1411 metrics.repository_index(self.cert)
1412 });
1413 self.processor.repository_index(repository_index);
1414 metrics.apply(
1415 &self.metrics,
1416 repository_index,
1417 self.cert.tal,
1418 );
1419 }
1420
1421 fn process_object(
1426 &mut self,
1427 uri: &uri::Rsync,
1428 content: Bytes,
1429 manifest: &mut ValidPointManifest,
1430 ca_task: &mut Vec<CaTask<P::PubPoint>>,
1431 ) -> Result<bool, Failed> {
1432 if !self.processor.want(uri)? {
1433 return Ok(true)
1434 }
1435
1436 if uri.ends_with(".cer") {
1437 self.process_cer(uri, content, manifest, ca_task)?;
1438 }
1439 else if uri.ends_with(".roa") {
1440 self.process_roa(uri, content, manifest)?;
1441 }
1442 else if uri.ends_with(".asa") {
1443 self.process_aspa(uri, content, manifest)?;
1444 }
1445 else if uri.ends_with(".gbr") {
1446 self.process_gbr(uri, content, manifest)?;
1447 }
1448 else if uri.ends_with(".crl") {
1449 if *uri != manifest.crl_uri {
1450 self.log.warn(format_args!("stray CRL {uri}."));
1451 manifest.metrics.stray_crls += 1;
1452 }
1453 }
1454 else {
1455 manifest.metrics.others += 1;
1456 self.log.warn(format_args!(
1457 "object {uri}: unknown type."
1458 ));
1459 }
1460 Ok(true)
1461 }
1462
1463 fn process_cer(
1465 &mut self,
1466 uri: &uri::Rsync,
1467 content: Bytes,
1468 manifest: &mut ValidPointManifest,
1469 ca_task: &mut Vec<CaTask<P::PubPoint>>,
1470 ) -> Result<(), Failed> {
1471 let cert = match Cert::decode(content) {
1472 Ok(cert) => cert,
1473 Err(_) => {
1474 manifest.metrics.invalid_certs += 1;
1475 self.log.warn(format_args!(
1476 "certificate {uri}: failed to decode."
1477 ));
1478 return Ok(())
1479 }
1480 };
1481
1482 if cert.key_usage() == KeyUsage::Ca {
1483 self.process_ca_cer(uri, cert, manifest, ca_task)
1484 }
1485 else {
1486 self.process_router_cert(uri, cert, manifest)
1487 }
1488 }
1489
1490 #[allow(clippy::too_many_arguments)]
1492 fn process_ca_cer(
1493 &mut self, uri: &uri::Rsync, cert: Cert,
1494 manifest: &mut ValidPointManifest,
1495 ca_task: &mut Vec<CaTask<P::PubPoint>>,
1496 ) -> Result<(), Failed> {
1497 if self.cert.check_loop(&cert).is_err() {
1498 self.log.warn(format_args!(
1499 "CA certificate {uri}: certificate loop detected."
1500 ));
1501 manifest.metrics.invalid_certs += 1;
1502 return Ok(())
1503 }
1504 let cert = match cert.validate_ca(
1505 self.cert.cert(), self.run.validation.strict
1506 ) {
1507 Ok(cert) => cert,
1508 Err(err) => {
1509 self.log.warn(format_args!(
1510 "CA certificagte {uri}: {err}."
1511 ));
1512 manifest.metrics.invalid_certs += 1;
1513 return Ok(())
1514 }
1515 };
1516 if let Err(err) = manifest.check_crl(&cert) {
1517 self.log.warn(format_args!(
1518 "CA certificate {uri}: {err}."
1519 ));
1520 manifest.metrics.invalid_certs += 1;
1521 return Ok(())
1522 }
1523
1524 let cert = match CaCert::chain(
1525 self.cert, uri.clone(), cert, self.run.validation.max_ca_depth,
1526 ) {
1527 Ok(cert) => cert,
1528 Err(_) => {
1529 manifest.metrics.invalid_certs += 1;
1530 return Ok(())
1531 }
1532 };
1533
1534 manifest.metrics.valid_ca_certs += 1;
1535
1536 let processor = match self.processor.process_ca(
1537 uri, &cert
1538 )? {
1539 Some(processor) => processor,
1540 None => return Ok(())
1541 };
1542
1543 let defer = match self.run.collector.as_ref() {
1546 Some(collector) => !collector.was_updated(&cert),
1547 None => false,
1548 };
1549
1550 let repository_index = if cert.repository_switch() {
1552 None
1553 }
1554 else {
1555 self.repository_index
1556 };
1557
1558 ca_task.push(CaTask {
1559 cert, processor, repository_index, defer
1560 });
1561 Ok(())
1562 }
1563
1564 fn process_router_cert(
1566 &mut self, uri: &uri::Rsync, cert: Cert,
1567 manifest: &mut ValidPointManifest,
1568 ) -> Result<(), Failed> {
1569 if let Err(err) = cert.validate_router(
1570 self.cert.cert(), self.run.validation.strict
1571 ) {
1572 self.log.warn(format_args!(
1573 "router certificate {uri}: {err}."
1574 ));
1575 manifest.metrics.invalid_certs += 1;
1576 return Ok(())
1577 };
1578 if let Err(err) = manifest.check_crl(&cert) {
1579 self.log.warn(format_args!(
1580 "router certificate {uri}: {err}."
1581 ));
1582 manifest.metrics.invalid_certs += 1;
1583 return Ok(())
1584 }
1585 manifest.metrics.valid_router_certs += 1;
1586 self.processor.process_router_cert(uri, cert, self.cert)?;
1587 Ok(())
1588 }
1589
1590 fn process_roa(
1592 &mut self, uri: &uri::Rsync, content: Bytes,
1593 manifest: &mut ValidPointManifest,
1594 ) -> Result<(), Failed> {
1595 let roa = match Roa::decode(
1596 content, self.run.validation.strict
1597 ) {
1598 Ok(roa) => roa,
1599 Err(_) => {
1600 manifest.metrics.invalid_roas += 1;
1601 self.log.warn(format_args!(
1602 "ROA {uri}: failed to decode."
1603 ));
1604 return Ok(())
1605 }
1606 };
1607 match roa.process(
1608 self.cert.cert(),
1609 self.run.validation.strict,
1610 |cert| manifest.check_crl(cert)
1611 ) {
1612 Ok((cert, route)) => {
1613 manifest.metrics.valid_roas += 1;
1614 self.processor.process_roa(uri, cert, route)?
1615 }
1616 Err(err) => {
1617 manifest.metrics.invalid_roas += 1;
1618 self.log.warn(format_args!(
1619 "ROA {uri}: {err}."
1620 ))
1621 }
1622 }
1623 Ok(())
1624 }
1625
1626 #[allow(unused_variables)]
1628 fn process_aspa(
1629 &mut self, uri: &uri::Rsync, content: Bytes,
1630 manifest: &mut ValidPointManifest,
1631 ) -> Result<(), Failed> {
1632 let aspa = match Aspa::decode(
1633 content, self.run.validation.strict
1634 ) {
1635 Ok(aspa) => aspa,
1636 Err(err) => {
1637 manifest.metrics.invalid_aspas += 1;
1638 self.log.warn(format_args!(
1639 "ASPA {uri}: failed to decode."
1640 ));
1641 return Ok(())
1642 }
1643 };
1644 match aspa.process(
1645 self.cert.cert(),
1646 self.run.validation.strict,
1647 |cert| manifest.check_crl(cert)
1648 ) {
1649 Ok((cert, aspa)) => {
1650 manifest.metrics.valid_aspas += 1;
1651 self.processor.process_aspa(uri, cert, aspa)?
1652 }
1653 Err(err) => {
1654 manifest.metrics.invalid_aspas += 1;
1655 self.log.warn(format_args!(
1656 "ASPA {uri}: {err}."
1657 ))
1658 }
1659 }
1660 Ok(())
1661 }
1662
1663 fn process_gbr(
1665 &mut self, uri: &uri::Rsync, content: Bytes,
1666 manifest: &mut ValidPointManifest,
1667 ) -> Result<(), Failed> {
1668 let obj = match SignedObject::decode(
1669 content, self.run.validation.strict
1670 ) {
1671 Ok(obj) => obj,
1672 Err(_) => {
1673 manifest.metrics.invalid_gbrs += 1;
1674 self.log.warn(format_args!(
1675 "GBR {uri}: failed to decode."
1676 ));
1677 return Ok(())
1678 }
1679 };
1680 match obj.process(
1681 self.cert.cert(),
1682 self.run.validation.strict,
1683 |cert| manifest.check_crl(cert)
1684 ) {
1685 Ok((cert, content)) => {
1686 manifest.metrics.valid_gbrs += 1;
1687 self.processor.process_gbr(uri, cert, content)?
1688 }
1689 Err(err) => {
1690 manifest.metrics.invalid_gbrs += 1;
1691 self.log.warn(format_args!(
1692 "GBR {uri}: {err}."
1693 ))
1694 }
1695 }
1696 Ok(())
1697 }
1698}
1699
1700
1701#[derive(Clone, Debug)]
1705struct ValidPointManifest {
1706 ee_cert: ResourceCert,
1708
1709 content: ManifestContent,
1711
1712 crl_uri: uri::Rsync,
1717
1718 crl: Crl,
1720
1721 manifest_bytes: Bytes,
1723
1724 crl_bytes: Bytes,
1726
1727 metrics: PublicationMetrics,
1732}
1733
1734impl ValidPointManifest {
1735 fn check_crl(&self, cert: &Cert) -> Result<(), ValidationError> {
1737 let crl_uri = match cert.crl_uri() {
1738 Some(some) => some,
1739 None => {
1740 return Err(InspectionError::new(
1741 "certificate has no CRL URI"
1742 ).into())
1743 }
1744 };
1745
1746 if *crl_uri != self.crl_uri {
1747 return Err(InspectionError::new(
1748 "certificate's CRL differs from manifest's"
1749 ).into())
1750 }
1751
1752 if self.crl.contains(cert.serial_number()) {
1753 return Err(InspectionError::new(
1754 "certificate has been revoked"
1755 ).into())
1756 }
1757
1758 Ok(())
1759 }
1760
1761 fn point_validity(&self, processor: &mut impl ProcessPubPoint) {
1763 processor.point_validity(
1764 self.ee_cert.validity(),
1765 cmp::min(
1766 self.content.next_update(),
1767 self.crl.next_update(),
1768 )
1769 )
1770 }
1771}
1772
1773
1774enum Task<'a, P> {
1778 Tal(TalTask<'a>),
1780
1781 Ca(CaTask<P>),
1783}
1784
1785impl<P> fmt::Debug for Task<'_, P> {
1786 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1787 match *self {
1788 Task::Tal(ref inner) => {
1789 write!(f, "TalTask {{ tal: {} }}", inner.tal.info().name())
1790 }
1791 Task::Ca(ref inner) => {
1792 write!(
1793 f, "CaTask {{ ca_repository: {} }}",
1794 inner.cert.ca_repository
1795 )
1796 }
1797 }
1798 }
1799}
1800
1801
1802struct TalTask<'a> {
1806 tal: &'a Tal,
1808
1809 index: usize,
1811}
1812
1813
1814struct CaTask<P> {
1818 cert: Arc<CaCert>,
1820
1821 processor: P,
1823
1824 repository_index: Option<usize>,
1826
1827 defer: bool,
1832}
1833
1834
1835#[derive(Clone, Debug)]
1842pub struct CaCert {
1843 cert: ResourceCert,
1845
1846 uri: TalUri,
1853
1854 ca_repository: uri::Rsync,
1856
1857 rpki_manifest: uri::Rsync,
1859
1860 parent: Option<Arc<CaCert>>,
1864
1865 chain_len: usize,
1867
1868 pub(crate) tal: usize,
1871}
1872
1873impl CaCert {
1874 pub fn root(
1876 cert: ResourceCert, uri: TalUri, tal: usize
1877 ) -> Result<Arc<Self>, Failed> {
1878 Self::new(cert, uri, None, 0, tal)
1879 }
1880
1881 pub fn chain(
1883 issuer: &Arc<Self>,
1884 uri: uri::Rsync,
1885 cert: ResourceCert,
1886 max_depth: usize,
1887 ) -> Result<Arc<Self>, Failed> {
1888 let chain_len = match issuer.chain_len.checked_add(1) {
1889 Some(chain_len) => chain_len,
1890 None => {
1891 error!(
1892 "CA {uri}: CA depth overrun."
1893 );
1894 return Err(Failed)
1895 }
1896 };
1897 if chain_len > max_depth {
1898 error!(
1899 "CA {uri}: CA depth overrun."
1900 );
1901 return Err(Failed)
1902 }
1903 Self::new(
1904 cert, TalUri::Rsync(uri),
1905 Some(issuer.clone()), chain_len,
1906 issuer.tal
1907 )
1908 }
1909
1910 fn new(
1912 cert: ResourceCert,
1913 uri: TalUri,
1914 parent: Option<Arc<Self>>,
1915 chain_len: usize,
1916 tal: usize,
1917 ) -> Result<Arc<Self>, Failed> {
1918 let ca_repository = match cert.ca_repository() {
1919 Some(uri) => uri.clone(),
1920 None => {
1921 error!(
1924 "CA cert {uri} has no repository URI. \
1925 Why has it not been rejected yet?"
1926 );
1927 return Err(Failed)
1928 }
1929 };
1930
1931 let rpki_manifest = match cert.rpki_manifest() {
1932 Some(uri) => uri.clone(),
1933 None => {
1934 error!(
1937 "CA cert {uri} has no manifest URI. \
1938 Why has it not been rejected yet?"
1939 );
1940 return Err(Failed)
1941 }
1942 };
1943 Ok(Arc::new(CaCert {
1944 cert, uri, ca_repository, rpki_manifest, parent, chain_len, tal
1945 }))
1946 }
1947
1948 pub fn check_loop(&self, cert: &Cert) -> Result<(), Failed> {
1950 self._check_loop(cert.subject_key_identifier())
1951 }
1952
1953 fn _check_loop(&self, key_id: KeyIdentifier) -> Result<(), Failed> {
1958 if self.cert.subject_key_identifier() == key_id {
1959 Err(Failed)
1960 }
1961 else if let Some(ref parent) = self.parent {
1962 parent._check_loop(key_id)
1963 }
1964 else {
1965 Ok(())
1966 }
1967 }
1968
1969 pub fn cert(&self) -> &ResourceCert {
1971 &self.cert
1972 }
1973
1974 pub fn uri(&self) -> &TalUri {
1981 &self.uri
1982 }
1983
1984 pub fn ca_repository(&self) -> &uri::Rsync {
1986 &self.ca_repository
1987 }
1988
1989 pub fn rpki_manifest(&self) -> &uri::Rsync {
1991 &self.rpki_manifest
1992 }
1993
1994 pub fn rpki_notify(&self) -> Option<&uri::Https> {
1996 self.cert.rpki_notify()
1997 }
1998
1999 pub(crate) fn repository_switch(&self) -> bool {
2005 let parent = match self.parent.as_ref() {
2006 Some(parent) => parent,
2007 None => return true,
2008 };
2009
2010 match self.rpki_notify() {
2011 Some(rpki_notify) => {
2012 Some(rpki_notify) != parent.rpki_notify()
2013 }
2014 None => {
2015 self.ca_repository.module() != parent.ca_repository.module()
2016 }
2017 }
2018 }
2019}
2020
2021
2022#[derive(Debug, Default)]
2026struct RunMetrics {
2027 tals: Vec<PublicationMetrics>,
2029
2030 repositories: Vec<PublicationMetrics>,
2032
2033 publication: PublicationMetrics,
2035
2036 log_books: Vec<(uri::Rsync, LogBook)>,
2038
2039 repository_indexes: Arc<Mutex<HashMap<String, usize>>>,
2044}
2045
2046impl RunMetrics {
2047 pub fn fork(&self) -> Self {
2049 RunMetrics {
2050 tals: Default::default(),
2051 repositories: Default::default(),
2052 publication: Default::default(),
2053 repository_indexes: self.repository_indexes.clone(),
2054 log_books: Default::default(),
2055 }
2056 }
2057
2058 pub fn repository_index(&self, cert: &CaCert) -> usize {
2062 let uri = cert.rpki_notify().map(|uri| {
2063 Cow::Borrowed(uri.as_str())
2064 }).unwrap_or_else(|| {
2065 cert.ca_repository.canonical_module()
2066 });
2067
2068 let mut repository_indexes = self.repository_indexes.lock().unwrap();
2069 if let Some(index) = repository_indexes.get(uri.as_ref()) {
2070 return *index
2071 }
2072
2073 let index = repository_indexes.len();
2074 repository_indexes.insert(uri.into_owned(), index);
2075 index
2076 }
2077
2078 pub fn apply(
2080 &mut self, metrics: &PublicationMetrics,
2081 repository_index: usize, tal_index: usize,
2082 ) {
2083 while self.repositories.len() <= repository_index {
2084 self.repositories.push(Default::default())
2085 }
2086 self.repositories[repository_index] += metrics;
2087
2088 while self.tals.len() <= tal_index {
2089 self.tals.push(Default::default())
2090 }
2091 self.tals[tal_index] += metrics;
2092
2093 self.publication += metrics;
2094 }
2095
2096 pub fn append_log(&mut self, uri: uri::Rsync, book: LogBook) {
2098 self.log_books.push((uri, book))
2099 }
2100
2101 pub fn prepare_final(&self, target: &mut Metrics) {
2103 let mut indexes: Vec<_>
2104 = self.repository_indexes.lock().unwrap().iter().map(|item| {
2105 (item.0.clone(), *item.1)
2106 }).collect();
2107 indexes.sort_by_key(|(_, idx)| *idx);
2108 target.repositories = indexes.into_iter().map(|(uri, _)| {
2109 RepositoryMetrics::new(uri)
2110 }).collect();
2111 }
2112
2113 pub fn collapse(mut self, target: &mut Metrics) {
2121 for (target, metric) in target.tals.iter_mut().zip(self.tals) {
2122 target.publication += metric
2123 }
2124 for (target, metric) in target.repositories.iter_mut().zip(
2125 self.repositories
2126 ) {
2127 target.publication += metric
2128 }
2129 target.publication += self.publication;
2130 target.pub_point_logs.append(&mut self.log_books);
2131 }
2132}
2133
2134
2135pub trait ProcessRun: Send + Sync {
2139 type PubPoint: ProcessPubPoint;
2141
2142 fn process_ta(
2154 &self, tal: &Tal, uri: &TalUri, cert: &CaCert, tal_index: usize
2155 ) -> Result<Option<Self::PubPoint>, Failed>;
2156}
2157
2158
2159pub trait ProcessPubPoint: Sized + Send + Sync {
2163 fn repository_index(&mut self, repository_index: usize) {
2165 let _ = repository_index;
2166 }
2167
2168 fn point_validity(
2174 &mut self,
2175 manifest_ee: Validity,
2176 stale: Time,
2177 ) {
2178 let _ = (manifest_ee, stale);
2179 }
2180
2181 fn want(&self, uri: &uri::Rsync) -> Result<bool, Failed>;
2187
2188 fn process_ca(
2199 &mut self, uri: &uri::Rsync, cert: &CaCert,
2200 ) -> Result<Option<Self>, Failed>;
2201
2202 fn process_router_cert(
2207 &mut self, uri: &uri::Rsync, cert: Cert, ca_cert: &CaCert,
2208 ) -> Result<(), Failed> {
2209 let _ = (uri, cert, ca_cert);
2210 Ok(())
2211 }
2212
2213 fn process_roa(
2218 &mut self,
2219 uri: &uri::Rsync,
2220 cert: ResourceCert,
2221 route: RouteOriginAttestation
2222 ) -> Result<(), Failed> {
2223 let _ = (uri, cert, route);
2224 Ok(())
2225 }
2226
2227 fn process_aspa(
2232 &mut self,
2233 uri: &uri::Rsync,
2234 cert: ResourceCert,
2235 aspa: AsProviderAttestation,
2236 ) -> Result<(), Failed> {
2237 let _ = (uri, cert, aspa);
2238 Ok(())
2239 }
2240
2241 fn process_gbr(
2249 &mut self,
2250 uri: &uri::Rsync,
2251 cert: ResourceCert,
2252 content: Bytes
2253 ) -> Result<(), Failed> {
2254 let _ = (uri, cert, content);
2255 Ok(())
2256 }
2257
2258 fn restart(&mut self) -> Result<(), Failed>;
2267
2268 fn commit(self);
2273
2274 fn cancel(self, _cert: &CaCert) {
2281 }
2282}
2283
2284
2285#[cfg(test)]
2288mod test {
2289 use super::*;
2290
2291 #[test]
2292 fn dump_empty_cache() {
2293 let _ = crate::process::Process::init(); let src = tempfile::tempdir().unwrap();
2295 let target = tempfile::tempdir().unwrap();
2296 let target = target.path().join("dump");
2297 let mut config = Config::default_with_paths(
2298 Default::default(), src.path().into()
2299 );
2300 config.rsync_command = "echo".into();
2301 config.rsync_args = Some(vec!["some".into()]);
2302 let engine = Engine::new(&config, true).unwrap();
2303 engine.dump(&target).unwrap();
2304 }
2305}
2306