1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::boxed::Box;
use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::mpsc::Sender;
use tokio::sync::Semaphore;
use tracing::error;
use crate::base::iana::OptRcode;
use crate::base::{Name, Rtype};
use crate::zonetree::{ReadableZone, SharedRrset, StoredName};
//------------ ZoneFunneler ---------------------------------------------------
pub struct ZoneFunneler {
read: Box<dyn ReadableZone>,
qname: StoredName,
zone_soa_rrset: SharedRrset,
batcher_tx: Sender<(Name<Bytes>, SharedRrset)>,
zone_walk_semaphore: Arc<Semaphore>,
}
impl ZoneFunneler {
pub fn new(
read: Box<dyn ReadableZone>,
qname: StoredName,
zone_soa_rrset: SharedRrset,
batcher_tx: Sender<(Name<Bytes>, SharedRrset)>,
zone_walk_semaphore: Arc<Semaphore>,
) -> Self {
Self {
read,
qname,
zone_soa_rrset,
batcher_tx,
zone_walk_semaphore,
}
}
pub async fn run(self) -> Result<(), OptRcode> {
// Limit the number of concurrently running XFR related zone walking
// operations.
if self.zone_walk_semaphore.acquire().await.is_err() {
error!("Internal error: Failed to acquire XFR zone walking semaphore");
return Err(OptRcode::SERVFAIL);
}
let cloned_batcher_tx = self.batcher_tx.clone();
let op = Box::new(
move |owner: StoredName,
rrset: &SharedRrset,
_at_zone_cut: bool| {
if rrset.rtype() != Rtype::SOA {
let _ = cloned_batcher_tx
.blocking_send((owner.clone(), rrset.clone()));
// If the blocking send fails it means that the
// batcher is no longer available. This can happen if
// it was no longer able to pass messages back to the
// underlying transport, which can happen if the
// client closed the connection. We don't log this
// because we can't stop the tree walk and so will
// keep hitting this error until the tree walk is
// complete, causing a lot of noise if we were to log
// this.
}
},
);
// Walk the zone tree, invoking our operation for each leaf.
match self.read.is_async() {
true => {
self.read.walk_async(op).await;
if let Err(err) = self
.batcher_tx
.send((self.qname, self.zone_soa_rrset))
.await
{
error!("Internal error: Failed to send final AXFR SOA to batcher: {err}");
return Err(OptRcode::SERVFAIL);
}
}
false => {
tokio::task::spawn_blocking(move || {
self.read.walk(op);
if let Err(err) = self
.batcher_tx
.blocking_send((self.qname, self.zone_soa_rrset))
{
error!("Internal error: Failed to send final AXFR SOA to batcher: {err}");
// Note: The lack of the final SOA will be detected by the batcher.
}
});
}
}
Ok(())
}
}