n0_mainline/core/
put_query.rs1use tracing::{debug, trace};
2
3use crate::{
4 Node,
5 common::{
6 ErrorSpecific, Id, PutRequest, PutRequestSpecific, RequestSpecific, RequestTypeSpecific,
7 },
8};
9
10use crate::actor::socket::KrpcSocket;
11
12#[derive(Debug)]
13pub struct PutQuery {
17 pub target: Id,
18 stored_at: u8,
20 inflight_requests: Vec<u32>,
21 pub request: PutRequestSpecific,
22 errors: Vec<(u8, ErrorSpecific)>,
23 extra_nodes: Box<[Node]>,
24}
25
26impl PutQuery {
27 pub fn new(request: PutRequestSpecific, extra_nodes: Option<Box<[Node]>>) -> Self {
28 Self {
29 target: *request.target(),
30 stored_at: 0,
31 inflight_requests: Vec::new(),
32 request,
33 errors: Vec::new(),
34 extra_nodes: extra_nodes.unwrap_or(Box::new([])),
35 }
36 }
37
38 pub fn start(
39 &mut self,
40 socket: &mut KrpcSocket,
41 closest_nodes: &[Node],
42 ) -> Result<(), PutError> {
43 if self.started() {
44 return Ok(());
45 };
46
47 let target = self.target;
48 debug!(?target, closest_secure_nodes = ?closest_nodes.len(), "PutQuery start");
49
50 if closest_nodes.is_empty() {
51 Err(PutQueryError::NoClosestNodes)?;
52 }
53
54 for node in closest_nodes
55 .iter()
56 .take(u8::MAX as usize)
57 .chain(self.extra_nodes.iter())
58 {
59 if let Some(token) = node.token() {
61 let tid = socket.request(
62 node.address(),
63 RequestSpecific {
64 requester_id: Id::random(),
65 request_type: RequestTypeSpecific::Put(PutRequest {
66 token,
67 put_request_type: self.request.clone(),
68 }),
69 },
70 );
71
72 self.inflight_requests.push(tid);
73 }
74 }
75
76 Ok(())
77 }
78
79 pub fn started(&self) -> bool {
80 !self.inflight_requests.is_empty()
81 }
82
83 pub fn inflight(&self, tid: u32) -> bool {
84 self.inflight_requests.contains(&tid)
85 }
86
87 pub fn success(&mut self) {
88 trace!(target = ?self.target, "PutQuery got success response");
89 self.stored_at += 1
90 }
91
92 pub fn error(&mut self, error: ErrorSpecific) {
93 debug!(target = ?self.target, ?error, "PutQuery got error");
94
95 if let Some(pos) = self
96 .errors
97 .iter()
98 .position(|(_, err)| error.code == err.code)
99 {
100 self.errors[pos].0 += 1;
102
103 let mut i = pos;
105 while i > 0 && self.errors[i].0 > self.errors[i - 1].0 {
106 self.errors.swap(i, i - 1);
107 i -= 1;
108 }
109 } else {
110 self.errors.push((1, error));
112 }
113 }
114
115 pub fn check(&self, socket: &KrpcSocket) -> Result<bool, PutError> {
117 if self.is_done(socket) {
119 let target = self.target;
120
121 if self.stored_at == 0 {
122 let most_common_error = self.most_common_error();
123
124 debug!(
125 ?target,
126 ?most_common_error,
127 nodes_count = self.inflight_requests.len(),
128 "Put Query: failed"
129 );
130
131 return Err(most_common_error
132 .map(|(_, error)| error)
133 .unwrap_or(PutQueryError::Timeout.into()));
134 }
135
136 debug!(?target, stored_at = ?self.stored_at, "PutQuery Done successfully");
137
138 return Ok(true);
139 } else if let Some(most_common_error) = self.majority_nodes_rejected_put_mutable() {
140 let target = self.target;
141
142 debug!(
143 ?target,
144 ?most_common_error,
145 nodes_count = self.inflight_requests.len(),
146 "PutQuery for MutableItem was rejected by most nodes with 3xx code."
147 );
148
149 return Err(most_common_error)?;
150 }
151
152 Ok(false)
153 }
154
155 fn is_done(&self, socket: &KrpcSocket) -> bool {
157 if self.inflight_requests.is_empty() {
159 return false;
160 }
161
162 !self
163 .inflight_requests
164 .iter()
165 .any(|tid| socket.inflight(tid))
166 }
167
168 fn majority_nodes_rejected_put_mutable(&self) -> Option<ConcurrencyError> {
170 let half = ((self.inflight_requests.len() / 2) + 1) as u8;
171
172 if matches!(self.request, PutRequestSpecific::PutMutable(_)) {
173 return self.most_common_error().and_then(|(count, error)| {
174 if count >= half {
175 if let PutError::Concurrency(err) = error {
176 Some(err)
177 } else {
178 None
179 }
180 } else {
181 None
182 }
183 });
184 };
185
186 None
187 }
188
189 fn most_common_error(&self) -> Option<(u8, PutError)> {
190 self.errors
191 .first()
192 .and_then(|(count, error)| match error.code {
193 301 => Some((*count, PutError::from(ConcurrencyError::CasFailed))),
194 302 => Some((*count, PutError::from(ConcurrencyError::NotMostRecent))),
195 _ => None,
196 })
197 }
198}
199
200#[n0_error::stack_error(derive, from_sources, std_sources)]
201#[derive(Clone)]
202pub enum PutError {
204 #[error(transparent)]
206 Query(PutQueryError),
207
208 #[error(transparent)]
209 Concurrency(ConcurrencyError),
211}
212
213#[n0_error::stack_error(derive, std_sources)]
214#[derive(Clone)]
215pub enum PutQueryError {
217 #[error("Failed to find any nodes close to store value at")]
221 NoClosestNodes,
222
223 #[error("Query Error Response")]
228 ErrorResponse(ErrorSpecific),
229
230 #[error("PutQuery timed out with no responses neither success or errors")]
232 Timeout,
233
234 #[error("DHT actor task has shut down")]
236 Shutdown,
237}
238
239#[n0_error::stack_error(derive, std_sources)]
240#[derive(Clone)]
241pub enum ConcurrencyError {
243 #[error(
247 "MutableItem::seq is not the most recent, try reading most recent item before writing again."
248 )]
249 NotMostRecent,
250
251 #[error("CAS check failed, try reading most recent item before writing again.")]
253 CasFailed,
254}