Skip to main content

n0_mainline/core/
put_query.rs

1use tracing::{debug, trace};
2
3use crate::{
4    Node,
5    common::{
6        ErrorSpecific, Id, PutRequest, PutRequestSpecific, RequestSpecific, RequestTypeSpecific,
7    },
8};
9
10use crate::actor::socket::KrpcSocket;
11
12#[derive(Debug)]
13/// Once an [super::IterativeQuery] is done, or if a previous cached one was a available,
14/// we can store data at the closest nodes using this PutQuery, that keeps track of
15/// acknowledging nodes, and or errors.
16pub struct PutQuery {
17    pub target: Id,
18    /// Nodes that confirmed success
19    stored_at: u8,
20    inflight_requests: Vec<u32>,
21    pub request: PutRequestSpecific,
22    errors: Vec<(u8, ErrorSpecific)>,
23    extra_nodes: Box<[Node]>,
24}
25
26impl PutQuery {
27    pub fn new(request: PutRequestSpecific, extra_nodes: Option<Box<[Node]>>) -> Self {
28        Self {
29            target: *request.target(),
30            stored_at: 0,
31            inflight_requests: Vec::new(),
32            request,
33            errors: Vec::new(),
34            extra_nodes: extra_nodes.unwrap_or(Box::new([])),
35        }
36    }
37
38    pub fn start(
39        &mut self,
40        socket: &mut KrpcSocket,
41        closest_nodes: &[Node],
42    ) -> Result<(), PutError> {
43        if self.started() {
44            return Ok(());
45        };
46
47        let target = self.target;
48        debug!(?target, closest_secure_nodes = ?closest_nodes.len(), "PutQuery start");
49
50        if closest_nodes.is_empty() {
51            Err(PutQueryError::NoClosestNodes)?;
52        }
53
54        for node in closest_nodes
55            .iter()
56            .take(u8::MAX as usize)
57            .chain(self.extra_nodes.iter())
58        {
59            // Set correct values to the request placeholders
60            if let Some(token) = node.token() {
61                let tid = socket.request(
62                    node.address(),
63                    RequestSpecific {
64                        requester_id: Id::random(),
65                        request_type: RequestTypeSpecific::Put(PutRequest {
66                            token,
67                            put_request_type: self.request.clone(),
68                        }),
69                    },
70                );
71
72                self.inflight_requests.push(tid);
73            }
74        }
75
76        Ok(())
77    }
78
79    pub fn started(&self) -> bool {
80        !self.inflight_requests.is_empty()
81    }
82
83    pub fn inflight(&self, tid: u32) -> bool {
84        self.inflight_requests.contains(&tid)
85    }
86
87    pub fn success(&mut self) {
88        trace!(target = ?self.target, "PutQuery got success response");
89        self.stored_at += 1
90    }
91
92    pub fn error(&mut self, error: ErrorSpecific) {
93        debug!(target = ?self.target, ?error, "PutQuery got error");
94
95        if let Some(pos) = self
96            .errors
97            .iter()
98            .position(|(_, err)| error.code == err.code)
99        {
100            // Increment the count of the existing error
101            self.errors[pos].0 += 1;
102
103            // Move the updated element to maintain the order (highest count first)
104            let mut i = pos;
105            while i > 0 && self.errors[i].0 > self.errors[i - 1].0 {
106                self.errors.swap(i, i - 1);
107                i -= 1;
108            }
109        } else {
110            // Add the new error with a count of 1
111            self.errors.push((1, error));
112        }
113    }
114
115    /// Check if the query is either successfully done, or terminated with an error.
116    pub fn check(&self, socket: &KrpcSocket) -> Result<bool, PutError> {
117        // And all queries got responses or timedout
118        if self.is_done(socket) {
119            let target = self.target;
120
121            if self.stored_at == 0 {
122                let most_common_error = self.most_common_error();
123
124                debug!(
125                    ?target,
126                    ?most_common_error,
127                    nodes_count = self.inflight_requests.len(),
128                    "Put Query: failed"
129                );
130
131                return Err(most_common_error
132                    .map(|(_, error)| error)
133                    .unwrap_or(PutQueryError::Timeout.into()));
134            }
135
136            debug!(?target, stored_at = ?self.stored_at, "PutQuery Done successfully");
137
138            return Ok(true);
139        } else if let Some(most_common_error) = self.majority_nodes_rejected_put_mutable() {
140            let target = self.target;
141
142            debug!(
143                ?target,
144                ?most_common_error,
145                nodes_count = self.inflight_requests.len(),
146                "PutQuery for MutableItem was rejected by most nodes with 3xx code."
147            );
148
149            return Err(most_common_error)?;
150        }
151
152        Ok(false)
153    }
154
155    /// Query started and finished
156    fn is_done(&self, socket: &KrpcSocket) -> bool {
157        // Didn't start yet.
158        if self.inflight_requests.is_empty() {
159            return false;
160        }
161
162        !self
163            .inflight_requests
164            .iter()
165            .any(|tid| socket.inflight(tid))
166    }
167
168    /// Return most common error if any
169    fn majority_nodes_rejected_put_mutable(&self) -> Option<ConcurrencyError> {
170        let half = ((self.inflight_requests.len() / 2) + 1) as u8;
171
172        if matches!(self.request, PutRequestSpecific::PutMutable(_)) {
173            return self.most_common_error().and_then(|(count, error)| {
174                if count >= half {
175                    if let PutError::Concurrency(err) = error {
176                        Some(err)
177                    } else {
178                        None
179                    }
180                } else {
181                    None
182                }
183            });
184        };
185
186        None
187    }
188
189    fn most_common_error(&self) -> Option<(u8, PutError)> {
190        self.errors
191            .first()
192            .and_then(|(count, error)| match error.code {
193                301 => Some((*count, PutError::from(ConcurrencyError::CasFailed))),
194                302 => Some((*count, PutError::from(ConcurrencyError::NotMostRecent))),
195                _ => None,
196            })
197    }
198}
199
200#[n0_error::stack_error(derive, from_sources, std_sources)]
201#[derive(Clone)]
202/// PutQuery errors
203pub enum PutError {
204    /// Common PutQuery errors
205    #[error(transparent)]
206    Query(PutQueryError),
207
208    #[error(transparent)]
209    /// PutQuery for [crate::MutableItem] errors
210    Concurrency(ConcurrencyError),
211}
212
213#[n0_error::stack_error(derive, std_sources)]
214#[derive(Clone)]
215/// Common PutQuery errors
216pub enum PutQueryError {
217    /// Failed to find any nodes close, usually means dht node failed to bootstrap,
218    /// so the routing table is empty. Check the machine's access to UDP socket,
219    /// or find better bootstrapping nodes.
220    #[error("Failed to find any nodes close to store value at")]
221    NoClosestNodes,
222
223    /// Either Put Query failed to store at any nodes, and most nodes responded
224    /// with a non `301` nor `302` errors.
225    ///
226    /// Either way; contains the most common error response.
227    #[error("Query Error Response")]
228    ErrorResponse(ErrorSpecific),
229
230    /// PutQuery timed out with no responses neither success or errors
231    #[error("PutQuery timed out with no responses neither success or errors")]
232    Timeout,
233
234    /// The DHT actor task has shut down.
235    #[error("DHT actor task has shut down")]
236    Shutdown,
237}
238
239#[n0_error::stack_error(derive, std_sources)]
240#[derive(Clone)]
241/// PutQuery for [crate::MutableItem] errors
242pub enum ConcurrencyError {
243    /// The [crate::MutableItem::seq] is less than or equal the sequence from another signed item.
244    ///
245    /// Try reading most recent mutable item before writing again.
246    #[error(
247        "MutableItem::seq is not the most recent, try reading most recent item before writing again."
248    )]
249    NotMostRecent,
250
251    /// The `CAS` condition does not match the `seq` of the most recent known signed item.
252    #[error("CAS check failed, try reading most recent item before writing again.")]
253    CasFailed,
254}