1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use std::vec::Vec;
use bytes::Bytes;
use futures_util::{pin_mut, StreamExt};
use tokio::sync::mpsc::Sender;
use tracing::error;
use crate::base::iana::OptRcode;
use crate::base::{Name, Rtype};
use crate::zonetree::{SharedRrset, StoredName, ZoneDiff, ZoneDiffItem};
//------------ DiffFunneler ----------------------------------------------------
pub struct DiffFunneler<Diff> {
qname: StoredName,
zone_soa_rrset: SharedRrset,
diffs: Vec<Diff>,
batcher_tx: Sender<(Name<Bytes>, SharedRrset)>,
}
impl<Diff> DiffFunneler<Diff>
where
Diff: ZoneDiff,
{
pub fn new(
qname: StoredName,
zone_soa_rrset: SharedRrset,
diffs: Vec<Diff>,
batcher_tx: Sender<(Name<Bytes>, SharedRrset)>,
) -> Self {
Self {
qname,
zone_soa_rrset,
diffs,
batcher_tx,
}
}
pub async fn run(self) -> Result<(), OptRcode> {
// https://datatracker.ietf.org/doc/html/rfc1995#section-4
// 4. Response Format
// ...
// "If incremental zone transfer is available, one or more
// difference sequences is returned. The list of difference
// sequences is preceded and followed by a copy of the server's
// current version of the SOA.
//
// Each difference sequence represents one update to the zone
// (one SOA serial change) consisting of deleted RRs and added
// RRs. The first RR of the deleted RRs is the older SOA RR
// and the first RR of the added RRs is the newer SOA RR.
//
// Modification of an RR is performed first by removing the
// original RR and then adding the modified one.
//
// The sequences of differential information are ordered oldest
// first newest last. Thus, the differential sequences are the
// history of changes made since the version known by the IXFR
// client up to the server's current version.
//
// RRs in the incremental transfer messages may be partial. That
// is, if a single RR of multiple RRs of the same RR type changes,
// only the changed RR is transferred."
if let Err(err) = self
.batcher_tx
.send((self.qname.clone(), self.zone_soa_rrset.clone()))
.await
{
error!("Internal error: Failed to send initial IXFR SOA to batcher: {err}");
return Err(OptRcode::SERVFAIL);
}
let qname = self.qname.clone();
for diff in self.diffs {
// 4. Response Format
// "Each difference sequence represents one update to the
// zone (one SOA serial change) consisting of deleted RRs
// and added RRs. The first RR of the deleted RRs is the
// older SOA RR and the first RR of the added RRs is the
// newer SOA RR.
let removed_soa =
diff.get_removed(qname.clone(), Rtype::SOA).await.unwrap(); // The diff MUST have a SOA record
Self::send_diff_section(
&qname,
&self.batcher_tx,
removed_soa,
diff.removed(),
)
.await?;
let added_soa =
diff.get_added(qname.clone(), Rtype::SOA).await.unwrap(); // The diff MUST have a SOA record
Self::send_diff_section(
&qname,
&self.batcher_tx,
added_soa,
diff.added(),
)
.await?;
}
if let Err(err) = self
.batcher_tx
.send((qname.clone(), self.zone_soa_rrset))
.await
{
error!("Internal error: Failed to send final IXFR SOA to batcher: {err}");
return Err(OptRcode::SERVFAIL);
}
Ok(())
}
async fn send_diff_section(
qname: &StoredName,
batcher_tx: &Sender<(Name<Bytes>, SharedRrset)>,
soa: &SharedRrset,
diff_stream: <Diff as ZoneDiff>::Stream<'_>,
) -> Result<(), OptRcode> {
if let Err(err) = batcher_tx.send((qname.clone(), soa.clone())).await
{
error!("Internal error: Failed to send SOA to batcher: {err}");
return Err(OptRcode::SERVFAIL);
}
pin_mut!(diff_stream);
while let Some(item) = diff_stream.next().await {
let (owner, rtype) = item.key();
if *rtype != Rtype::SOA {
let rrset = item.value();
if let Err(err) =
batcher_tx.send((owner.clone(), rrset.clone())).await
{
error!("Internal error: Failed to send RRSET to batcher: {err}");
return Err(OptRcode::SERVFAIL);
}
}
}
Ok(())
}
}