dtn7/dtnd/
daemon.rs

1use std::convert::TryFrom;
2
3use super::{httpd, janitor};
4use crate::cla::ecla::processing::start_ecla;
5use crate::cla::ConvergenceLayerAgent;
6use crate::core::application_agent::SimpleApplicationAgent;
7use crate::dtnconfig::DtnConfig;
8use crate::ipnd::neighbour_discovery;
9use crate::{cla_add, peers_add};
10use crate::{CLAS, CONFIG, DTNCORE, STORE};
11use bp7::EndpointID;
12use log::{error, info, warn};
13
14/*
15use crate::core::core::DtnCore;
16use std::sync::mpsc;
17use std::sync::mpsc::{Receiver, Sender};
18
19#[derive(Debug)]
20pub enum DtnCmd {
21    DtnCore(Sender<DtnCmdResult>),
22}
23
24#[derive(Debug)]
25pub enum DtnCmdResult {
26    DtnCore(Sender<DtnCore>, DtnCore),
27}
28
29pub fn access_core<F>(tx: Sender<DtnCmd>, mut f: F)
30where
31    F: FnMut(&mut DtnCore),
32{
33    let (tx2, rx2) = mpsc::channel();
34    tx.send(DtnCmd::DtnCore(tx2)).unwrap();
35    let DtnCmdResult::DtnCore(tx3, mut core) = rx2.recv().expect("Couldn't access dtn core!");
36    f(&mut core);
37    tx3.send(core).expect("IPC Error");
38}
39
40fn spawn_core_daemon(rx: Receiver<DtnCmd>) {
41    for received in rx {
42        //println!("Got: {:?}", received);
43        match received {
44            DtnCmd::DtnCore(tx) => {
45                let (tx2, rx2) = mpsc::channel();
46                tx.send(DtnCmdResult::DtnCore(tx2, core))
47                    .expect("IPC Error");
48                core = rx2.recv().unwrap();
49            }
50        };
51    }
52}*/
53
54// this function is only called once during startup
55// therefore it should be safe to hold the lock
56#[allow(clippy::await_holding_lock)]
57async fn start_convergencylayers() {
58    info!("Starting convergency layers");
59
60    for cl in &mut (*CLAS.lock()) {
61        info!("Setup {}", cl);
62        cl.setup().await;
63    }
64}
65
66pub async fn start_dtnd(cfg: DtnConfig) -> anyhow::Result<()> {
67    {
68        (*CONFIG.lock()).set(cfg);
69    }
70    info!("Local Node ID: {}", CONFIG.lock().host_eid);
71
72    info!("Work Dir: {:?}", CONFIG.lock().workdir);
73
74    let db = CONFIG.lock().db.clone();
75    info!("DB Backend: {}", db);
76
77    (*STORE.lock()) = crate::core::store::new(&db);
78
79    info!(
80        "Announcement Interval: {}",
81        humantime::format_duration(CONFIG.lock().announcement_interval)
82    );
83
84    info!(
85        "Janitor Interval: {}",
86        humantime::format_duration(CONFIG.lock().janitor_interval)
87    );
88
89    info!(
90        "Peer Timeout: {}",
91        humantime::format_duration(CONFIG.lock().peer_timeout)
92    );
93
94    info!("Web Port: {}", CONFIG.lock().webport);
95    info!("IPv4: {}", CONFIG.lock().v4);
96    info!("IPv6: {}", CONFIG.lock().v6);
97
98    info!(
99        "Generate Status Reports: {}",
100        CONFIG.lock().generate_status_reports
101    );
102
103    let routing = CONFIG.lock().routing.clone();
104    DTNCORE.lock().routing_agent = crate::routing::new(&routing);
105
106    info!("RoutingAgent: {}", routing);
107
108    let routing_options = CONFIG.lock().routing_settings.clone();
109    info!("RoutingOptions: {:?}", routing_options);
110
111    let clas = CONFIG.lock().clas.clone();
112    for (cla, local_settings) in &clas {
113        info!("Adding CLA: {:?}", cla);
114        cla_add(crate::cla::new(cla, Some(local_settings)));
115    }
116    if clas.is_empty() {
117        warn!("No CLAs configured!");
118    }
119
120    for s in &CONFIG.lock().statics {
121        info!(
122            "Adding static peer: {}://{}/{}",
123            s.cla_list[0].0,
124            s.addr,
125            s.eid.node().unwrap()
126        );
127        peers_add(s.clone());
128    }
129
130    let local_host_id = CONFIG.lock().host_eid.clone();
131    (*DTNCORE.lock())
132        .register_application_agent(SimpleApplicationAgent::with(local_host_id.clone()).into());
133    for e in &CONFIG.lock().endpoints {
134        let eid = if let Ok(eid) = EndpointID::try_from(e.clone()) {
135            // TODO: add check if non-local ID that service name is non-singleton ('~') for naming scheme dtn
136            eid
137        } else {
138            local_host_id
139                .new_endpoint(e)
140                .expect("Error constructing new endpoint")
141        };
142
143        (*DTNCORE.lock()).register_application_agent(SimpleApplicationAgent::with(eid).into());
144    }
145    start_convergencylayers().await;
146    if CONFIG.lock().janitor_interval.as_micros() != 0 {
147        janitor::spawn_janitor();
148    }
149
150    let dn = CONFIG.lock().disable_neighbour_discovery;
151    let interval = CONFIG.lock().announcement_interval.as_micros();
152    if !dn && interval != 0 {
153        if let Err(errmsg) = neighbour_discovery::spawn_neighbour_discovery().await {
154            error!("Error spawning service discovery: {:?}", errmsg);
155        }
156    }
157
158    if CONFIG.lock().ecla_enable {
159        let ecla_port = CONFIG.lock().ecla_tcp_port;
160        start_ecla(ecla_port).await;
161    }
162
163    httpd::spawn_httpd().await?;
164    Ok(())
165}