use crate::core::bundlepack::*;
use crate::core::*;
use crate::routing::RoutingNotifcation;
use crate::store_push_bundle;
use crate::store_remove;
use crate::CONFIG;
use crate::DTNCORE;
use crate::{is_local_node_id, STATS};
use crate::{routing_notify, routing_sender_for_bundle, store_add_bundle_if_unknown};
use bp7::administrative_record::*;
use bp7::bundle::*;
use bp7::flags::*;
use bp7::CanonicalData;
use bp7::BUNDLE_AGE_BLOCK;
use anyhow::{bail, Result};
use log::trace;
use log::{debug, info, warn};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::sync::mpsc::channel;
pub async fn send_bundle(bndl: Bundle) {
tokio::spawn(async move {
if let Err(err) = store_push_bundle(&bndl) {
warn!("Transmission failed: {}", err);
return;
}
if let Err(err) = transmit(bndl.into()).await {
warn!("Transmission failed: {}", err);
}
});
}
pub fn send_through_task(bndl: Bundle) {
let rt = tokio::runtime::Handle::current();
let mut stask = crate::SENDERTASK.lock();
if stask.is_none() {
let (tx, rx) = channel(50);
tokio::spawn(sender_task(rx));
*stask = Some(tx);
}
let tx = stask.as_ref().unwrap().clone();
rt.spawn(async move { tx.send(bndl).await });
}
pub async fn send_through_task_async(bndl: Bundle) {
let tx = {
let mut stask = crate::SENDERTASK.lock();
if stask.is_none() {
let (tx, rx) = channel(50);
tokio::spawn(sender_task(rx));
*stask = Some(tx);
}
stask.as_ref().unwrap().clone()
};
if let Err(err) = tx.send(bndl).await {
warn!("Transmission failed: {}", err);
}
}
pub async fn sender_task(mut rx: tokio::sync::mpsc::Receiver<Bundle>) {
while let Some(bndl) = rx.recv().await {
debug!("sending bundle through task channel");
send_bundle(bndl).await;
}
}
pub async fn transmit(mut bp: BundlePack) -> Result<()> {
info!("Transmission of bundle requested: {}", bp.id());
bp.add_constraint(Constraint::DispatchPending);
bp.sync()?;
let src = &bp.source;
if src != &bp7::EndpointID::none() && (*DTNCORE.lock()).get_endpoint_mut(src).is_none() {
info!(
"Bundle's source is neither dtn:none nor an endpoint of this node: {} {}",
bp.id(),
src
);
delete(bp, NO_INFORMATION).await?;
} else {
dispatch(bp).await?;
}
Ok(())
}
pub async fn receive(mut bndl: Bundle) -> Result<()> {
if store_add_bundle_if_unknown(&bndl)? {
info!("Received new bundle: {}", bndl.id());
STATS.lock().incoming += 1;
} else {
debug!(
"Received an already known bundle, skip processing: {}",
bndl.id()
);
STATS.lock().dups += 1;
return Ok(());
}
let mut bp = BundlePack::from(&bndl);
bp.add_constraint(Constraint::DispatchPending);
bp.sync()?;
if bndl
.primary
.bundle_control_flags
.contains(BundleControlFlags::BUNDLE_STATUS_REQUEST_RECEPTION)
&& !bndl.is_administrative_record()
&& CONFIG.lock().generate_status_reports
{
send_status_report(&bp, RECEIVED_BUNDLE, NO_INFORMATION).await;
}
let mut remove_idx = Vec::new();
let mut index = 0;
for cb in bndl.canonicals.iter() {
if cb.block_type < 11 {
continue;
}
warn!(
"Bundle's canonical block is unknown: {} {}",
bp.id(),
cb.block_type
);
let flags = cb.block_control_flags.flags();
if flags.contains(BlockControlFlags::BLOCK_STATUS_REPORT) {
info!(
"Bundle's unknown canonical block requested reporting: {} {}",
bp.id(),
cb.block_type
);
if CONFIG.lock().generate_status_reports {
send_status_report(&bp, RECEIVED_BUNDLE, BLOCK_UNINTELLIGIBLE).await;
} else {
info!("Generation of status reports disabled, ignoring request");
}
}
if flags.contains(BlockControlFlags::BLOCK_DELETE_BUNDLE) {
info!(
"Bundle's unknown canonical block requested bundle deletion: {} {}",
bp.id(),
cb.block_type
);
delete(bp, BLOCK_UNINTELLIGIBLE).await?;
return Ok(());
}
if flags.contains(BlockControlFlags::BLOCK_REMOVE) {
info!(
"Bundle's unknown canonical block requested to be removed: {} {} {}",
bp.id(),
cb.block_number,
cb.block_type
);
remove_idx.push(index);
}
index += 1;
}
for i in remove_idx {
bndl.canonicals.remove(i);
}
if let Err(err) = store_push_bundle(&bndl) {
bail!("error adding received bundle: {} {}", bndl.id(), err);
}
if let Err(err) = dispatch(bp).await {
warn!("Dispatching failed: {}", err);
}
Ok(())
}
pub async fn dispatch(bp: BundlePack) -> Result<()> {
info!("Dispatching bundle: {}", bp.id());
if let Err(err) = routing_notify(RoutingNotifcation::IncomingBundle(
store_get_bundle(bp.id()).unwrap(),
))
.await
{
error!("Error while sending incoming bundle notification: {}", err);
}
if (*DTNCORE.lock()).is_in_endpoints(&bp.destination)
{
local_delivery(bp.clone()).await?;
}
if !is_local_node_id(&bp.destination) {
tokio::spawn(forward(bp));
}
Ok(())
}
async fn handle_hop_count_block(mut bundle: Bundle) -> Result<Bundle> {
let bid = bundle.id();
if let Some(hc) = bundle.extension_block_by_type_mut(bp7::canonical::HOP_COUNT_BLOCK) {
if hc.hop_count_increase() {
let (hc_limit, hc_count) = hc
.hop_count_get()
.expect("hop count data missing from hop count block");
debug!(
"Bundle contains an hop count block: {} {} {}",
&bid, hc_limit, hc_count
);
if hc.hop_count_exceeded() {
warn!(
"Bundle contains an exceeded hop count block: {} {} {}",
&bid, hc_limit, hc_count
);
delete(bundle.into(), HOP_LIMIT_EXCEEDED).await?;
bail!("hop count exceeded");
}
}
}
Ok(bundle)
}
async fn handle_primary_lifetime(bundle: &Bundle) -> Result<()> {
if bundle.primary.is_lifetime_exceeded() {
warn!(
"Dropping bundle, primary block lifetime is exceeded: {} {:?}",
bundle.id(),
bundle.primary
);
delete(bundle.into(), LIFETIME_EXPIRED).await?;
bail!("lifetime exceeded");
}
Ok(())
}
pub fn update_bundle_age(bundle: &mut Bundle) -> Option<u64> {
let bid = bundle.id();
if let Some(block) = bundle.extension_block_by_type_mut(BUNDLE_AGE_BLOCK) {
let mut new_age = 0; let bp = store_get_metadata(&bid)?;
if let CanonicalData::BundleAge(age) = block.data() {
let offset = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64
- bp.received_time;
new_age = age + offset;
}
if new_age != 0 {
block.set_data(CanonicalData::BundleAge(new_age));
return Some(new_age);
}
}
None
}
async fn handle_bundle_age_block(mut bundle: Bundle) -> Result<Bundle> {
if let Some(age) = update_bundle_age(&mut bundle) {
if std::time::Duration::from_micros(age) >= bundle.primary.lifetime {
warn!("Dropping bundle, age exceeds lifetime: {}", bundle.id());
delete(bundle.into(), LIFETIME_EXPIRED).await?;
bail!("age block lifetime exceeded");
}
}
Ok(bundle)
}
async fn handle_previous_node_block(mut bundle: Bundle) -> Result<Bundle> {
if let Some(pnb) = bundle.extension_block_by_type_mut(bp7::canonical::PREVIOUS_NODE_BLOCK) {
let prev_eid = &pnb
.previous_node_get()
.expect("no previoud node EID found!")
.clone();
let local_eid = CONFIG.lock().host_eid.clone();
pnb.previous_node_update(local_eid.clone());
debug!(
"Previous Node Block was updated: {} {} {}",
bundle.id(),
prev_eid,
local_eid
);
} else {
let local_eid = CONFIG.lock().host_eid.clone();
let pnb = bp7::canonical::new_previous_node_block(0, BlockControlFlags::empty(), local_eid);
bundle.add_canonical_block(pnb);
}
Ok(bundle)
}
pub async fn forward(mut bp: BundlePack) -> Result<()> {
let bpid = bp.id().to_string();
trace!("Forward request for bundle: {}", bpid);
bp.add_constraint(Constraint::ForwardPending);
bp.remove_constraint(Constraint::DispatchPending);
trace!("updating bundle info in store: {}", bpid);
bp.sync()?;
let bundle_sent = Arc::new(AtomicBool::new(false));
trace!("Check delivery");
let (nodes, delete_afterwards) = routing_sender_for_bundle(bp.clone()).await?;
if !nodes.is_empty() {
debug!("Attempting forwarding of {} to nodes: {:?}", bp.id(), nodes);
}
if nodes.is_empty() {
trace!("No new peers for forwarding of bundle {}", &bp.id());
} else {
debug!("Attempting forwarding of {} to nodes: {:?}", bp.id(), nodes);
let bndl = store_get_bundle(&bpid);
if bndl.is_none() {
bail!("bundle not found: {}", bpid);
}
let mut bndl = bndl.unwrap();
trace!("Handle lifetime");
handle_primary_lifetime(&bndl).await?;
trace!("Handle hop count block");
bndl = handle_hop_count_block(bndl).await?;
trace!("Handle previous node block");
bndl = handle_previous_node_block(bndl).await?;
trace!("Handle bundle age block");
bndl = handle_bundle_age_block(bndl).await?;
let mut wg = Vec::new();
let bundle_data = bndl.to_cbor();
for n in nodes {
let bd = bundle_data.clone(); let bpid = bpid.clone();
let bundle_sent = std::sync::Arc::clone(&bundle_sent);
let n = n.clone();
let task_handle = tokio::spawn(async move {
let start_time = Instant::now();
debug!(
"Sending bundle to a CLA: {} {} {}",
&bpid, n.dest, n.cla_name
);
if let Err(err) = n.transfer(bd).await {
info!(
"Sending bundle {} via {} to {} ({}) failed after {:?}",
&bpid,
n.cla_name,
n.dest,
n.next_hop,
start_time.elapsed()
);
STATS.lock().failed += 1;
debug!("Error while transferring bundle {}: {}", &bpid, err);
let mut failed_peer = None;
if let Err(err) = routing_notify(RoutingNotifcation::SendingFailed(
bpid,
n.next_hop.node().unwrap(),
))
.await
{
error!("Error while sending failed notification: {}", err);
}
if let Some(peer_entry) = (*PEERS.lock()).get_mut(&n.next_hop.node().unwrap()) {
debug!(
"Reporting failed sending to peer: {}",
&n.next_hop.node().unwrap()
);
peer_entry.report_fail();
if peer_entry.failed_too_much() && peer_entry.con_type == PeerType::Dynamic
{
failed_peer = Some(peer_entry.node_name());
}
}
if let Some(peer) = failed_peer {
let peers_before = (*PEERS.lock()).len();
(*PEERS.lock()).remove(&peer);
let peers_after = (*PEERS.lock()).len();
debug!("Removing peer {} from list of neighbors due to too many failed transmissions ({}/{})", peer, peers_before, peers_after);
}
} else {
info!(
"Sending bundle succeeded: {} {} {} in {:?}",
&bpid,
n.dest,
n.cla_name,
start_time.elapsed()
);
STATS.lock().outgoing += 1;
bundle_sent.store(true, Ordering::Relaxed);
if let Err(err) = routing_notify(RoutingNotifcation::SendingSucceeded(
bpid,
n.next_hop.node().unwrap(),
))
.await
{
error!("Error while sending succeeded notification: {}", err);
}
}
});
wg.push(task_handle);
}
use futures::future::join_all;
join_all(wg).await;
if bundle_sent.load(Ordering::Relaxed) {
if bndl
.primary
.bundle_control_flags
.contains(BundleControlFlags::BUNDLE_STATUS_REQUEST_FORWARD)
&& !bndl.is_administrative_record()
&& CONFIG.lock().generate_status_reports
{
send_status_report(&bp, FORWARDED_BUNDLE, NO_INFORMATION).await;
}
if delete_afterwards {
store_remove(&bpid)?;
} else if bndl.is_administrative_record() {
is_administrative_record_valid(&bndl);
contraindicated(bp)?;
}
} else {
info!("Failed to forward bundle to any CLA: {}", bp.id());
}
}
Ok(())
}
pub async fn local_delivery(mut bp: BundlePack) -> Result<()> {
info!("Received bundle for local delivery: {}", bp.id());
let bndl = store_get_bundle(bp.id());
if bndl.is_none() {
bail!("bundle not found");
}
let bndl = bndl.unwrap();
if bp.administrative && !is_administrative_record_valid(&bndl) {
delete(bp, NO_INFORMATION).await?;
bail!("Empty administrative record");
}
bp.add_constraint(Constraint::LocalEndpoint);
bp.sync()?;
if is_local_node_id(&bp.destination) {
if bndl
.primary
.bundle_control_flags
.contains(BundleControlFlags::BUNDLE_STATUS_REQUEST_DELIVERY)
&& !bndl.is_administrative_record()
&& CONFIG.lock().generate_status_reports
{
send_status_report(&bp, DELIVERED_BUNDLE, NO_INFORMATION).await;
}
bp.clear_constraints();
} else {
info!(
"Add forwarding constraint again as bundle is non-local destination: {}",
bp.id()
);
bp.add_constraint(Constraint::ForwardPending);
}
bp.sync()?;
if let Some(aa) = (*DTNCORE.lock()).get_endpoint_mut(&bp.destination) {
info!("Delivering {}", bp.id());
aa.push(&bndl);
STATS.lock().delivered += 1;
}
Ok(())
}
pub fn contraindicated(mut bp: BundlePack) -> Result<()> {
info!("Bundle marked for contraindication: {}", bp.id());
bp.add_constraint(Constraint::Contraindicated);
bp.sync()?;
Ok(())
}
pub async fn delete(mut bp: BundlePack, reason: StatusReportReason) -> Result<()> {
let bndl = store_get_bundle(bp.id());
if bndl.is_none() {
bail!("bundle not found");
}
let bndl = bndl.unwrap();
if bndl
.primary
.bundle_control_flags
.contains(BundleControlFlags::BUNDLE_STATUS_REQUEST_DELETION)
&& !bndl.is_administrative_record()
&& CONFIG.lock().generate_status_reports
{
send_status_report(&bp, DELETED_BUNDLE, reason).await;
}
bp.clear_constraints();
info!("Bundle marked for deletion: {}", bp.id());
bp.sync()?;
Ok(())
}
fn is_administrative_record_valid(bundle: &Bundle) -> bool {
if !bundle.is_administrative_record() {
warn!(
"Bundle does not contain an administrative record: {}",
bundle.id()
);
return false;
}
let payload = bundle.extension_block_by_type(bp7::PAYLOAD_BLOCK);
if payload.is_none() {
warn!(
"Bundle with an administrative record flag misses payload block: {}",
bundle.id()
);
return false;
}
match payload.unwrap().data() {
bp7::canonical::CanonicalData::Data(data) => {
match serde_cbor::from_slice::<AdministrativeRecord>(data) {
Ok(ar) => {
info!(
"Received bundle contains an administrative record: {} {:?}",
bundle.id(),
ar
);
inspect_status_report(&bundle.id(), ar);
true
}
Err(ar) => {
warn!(
"Bundle with an administrative record could not be parsed: {} {:?}",
bundle.id(),
ar
);
false
}
}
}
_ => {
warn!(
"Bundle with an administrative record could not be parsed: {}",
bundle.id()
);
false
}
}
}
fn inspect_status_report(bid: &str, ar: AdministrativeRecord) {
if let AdministrativeRecord::BundleStatusReport(bsr) = &ar {
let sips = &bsr.status_information;
if sips.is_empty() {
warn!(
"Administrative record contains no status information: {} {:?}",
bid, ar
);
return;
}
if !store_has_item(&bsr.refbundle()) {
warn!("Status Report's bundle is unknown: {} {:?}", bid, ar);
return;
}
if sips.len() != bp7::administrative_record::MAX_STATUS_INFORMATION_POS as usize {
warn!(
"Status Report's number of status information is invalid: {} {:?}",
bid,
sips.len()
);
return;
}
for (i, sip) in sips.iter().enumerate() {
debug!("Parsing Status Report: {} #{} {:?} {:?}", bid, i, bsr, sip);
match i as u32 {
bp7::administrative_record::RECEIVED_BUNDLE => {}
bp7::administrative_record::FORWARDED_BUNDLE => {}
bp7::administrative_record::DELETED_BUNDLE => {}
bp7::administrative_record::DELIVERED_BUNDLE => {
info!(
"Status Report indicated bundle delivery: {} {}",
bid,
bsr.refbundle()
);
if store_remove(&bsr.refbundle()).is_err() {
warn!(
"Status Report could not remove bundle: {} {}",
bid,
bsr.refbundle()
);
}
}
_ => {
warn!(
"Status Report has unknown status information code: {} #{}",
bid, i,
);
}
}
}
} else {
warn!("No bundle status information found: {} {:?}", bid, ar);
}
}
async fn send_status_report(
bp: &BundlePack,
status: StatusInformationPos,
reason: StatusReportReason,
) {
if bp.administrative || bp.source == EndpointID::none() {
warn!("status report sending denied for dtn:none sources/administrative bundles themselves: {}", bp.id());
return;
}
let bndl = store_get_bundle(bp.id());
if bndl.is_none() {
warn!("bundle not found when sending status report: {}", bp.id());
return;
}
let bndl = bndl.unwrap();
if (*DTNCORE.lock()).is_in_endpoints(&bndl.primary.report_to) {
return;
}
info!(
"Sending a status report for a bundle: {} {:?} {:?}",
bp.id(),
status,
reason
);
let out_bndl = new_status_report_bundle(
&bndl,
CONFIG.lock().host_eid.clone(),
bndl.primary.crc.to_code(),
status,
reason,
);
if let Err(err) = store_push_bundle(&out_bndl) {
warn!("Storing new status report failed: {}", err);
return;
}
let mut bp: BundlePack = out_bndl.into();
bp.add_constraint(Constraint::ForwardPending);
if let Err(err) = bp.sync() {
warn!("Sending status report failed: {}", err);
}
debug!("Enqueued status report: {}", bp.id());
}