mutant_lib/ops/put/
task.rs1use crate::error::Error;
2use crate::index::{PadInfo, PadStatus};
3use crate::internal_events::invoke_put_callback;
4use crate::network::NetworkError;
5use crate::ops::worker::AsyncTask;
6use crate::ops::MAX_CONFIRMATION_DURATION;
7use async_trait::async_trait;
8use log::{debug, error, info, warn};
9use mutant_protocol::PutEvent;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::time::Instant;
13
14use super::super::PAD_RECYCLING_RETRIES;
15use super::context::PutTaskContext;
16
17#[derive(Clone)]
18pub struct PutTaskProcessor {
19 pub context: Arc<PutTaskContext>,
20}
21
22impl PutTaskProcessor {
23 pub fn new(context: Arc<PutTaskContext>) -> Self {
24 Self { context }
25 }
26}
27
28#[async_trait]
29impl AsyncTask<PadInfo, PutTaskContext, autonomi::Client, (), Error> for PutTaskProcessor {
30 type ItemId = usize;
31
32 async fn process(
33 &self,
34 worker_id: usize,
35 client: &autonomi::Client,
36 pad: PadInfo,
37 ) -> Result<(Self::ItemId, ()), (Error, PadInfo)> {
38 let mut pad_state = pad.clone();
39 let current_pad_address = pad_state.address;
40 let initial_status = pad_state.status;
41 let mut put_succeeded = false;
42 let is_public = self.context.base_context.public;
43
44 let should_put = match initial_status {
45 PadStatus::Generated | PadStatus::Free => true,
46 PadStatus::Written => false,
47 PadStatus::Confirmed => {
48 return Ok((pad_state.chunk_index, ()));
49 }
50 };
51
52 if should_put {
53 let chunk_index = pad_state.chunk_index;
54 let range = self
55 .context
56 .base_context
57 .chunk_ranges
58 .get(chunk_index)
59 .ok_or_else(|| {
60 (
61 Error::Internal(format!(
62 "Invalid chunk index {} for key {}",
63 chunk_index, self.context.base_context.name
64 )),
65 pad_state.clone(),
66 )
67 })?;
68 let chunk_data = self
69 .context
70 .base_context
71 .data
72 .get(range.clone())
73 .ok_or_else(|| {
74 (
75 Error::Internal(format!(
76 "Data range {:?} out of bounds for key {}",
77 range, self.context.base_context.name
78 )),
79 pad_state.clone(),
80 )
81 })?;
82
83 let max_put_retries = PAD_RECYCLING_RETRIES;
84 let mut last_put_error: Option<Error> = None;
85 for attempt in 1..=max_put_retries {
86 let put_result = self
87 .context
88 .base_context
89 .network
90 .put(
91 client,
92 &pad_state,
93 chunk_data,
94 self.context.base_context.encoding,
95 is_public,
96 )
97 .await;
98
99 match put_result {
100 Ok(_) => {
101 let was_generated = initial_status == PadStatus::Generated;
103
104 pad_state.status = PadStatus::Written;
105 match self
106 .context
107 .base_context
108 .index
109 .write()
110 .await
111 .update_pad_status(
112 &self.context.base_context.name,
113 ¤t_pad_address,
114 PadStatus::Written,
115 None,
116 ) {
117 Ok(updated_pad) => pad_state = updated_pad,
118 Err(e) => return Err((e, pad_state.clone())),
119 }
120
121 if was_generated {
123 info!(
124 "Worker {} sending PadReserved event for pad {} (chunk {})",
125 worker_id, current_pad_address, pad_state.chunk_index
126 );
127 invoke_put_callback(&self.context.put_callback, PutEvent::PadReserved)
128 .await
129 .map_err(|e| (e, pad_state.clone()))?;
130 }
131
132 put_succeeded = true;
133 last_put_error = None;
134 break;
135 }
136 Err(e) => {
137 warn!(
138 "Worker {} failed put attempt {}/{} for pad {} (chunk {}): {}. Retrying...",
139 worker_id, attempt, max_put_retries, current_pad_address, pad_state.chunk_index, e
140 );
141 last_put_error = Some(Error::Network(e));
142 if attempt < max_put_retries {
143 tokio::time::sleep(Duration::from_secs(1)).await;
144 }
145 }
146 }
147 }
148
149 if !put_succeeded {
150 return Err((
151 last_put_error.unwrap_or_else(|| {
152 Error::Internal(format!(
153 "Put failed for pad {} after {} retries with unknown error",
154 current_pad_address, max_put_retries
155 ))
156 }),
157 pad_state,
158 ));
159 }
160
161 invoke_put_callback(&self.context.put_callback, PutEvent::PadsWritten)
162 .await
163 .map_err(|e| (e, pad_state.clone()))?;
164 } else {
165 put_succeeded = true;
166 pad_state = pad.clone();
167 }
168
169 if put_succeeded && !*self.context.no_verify {
170 let confirmation_start = Instant::now();
171 let max_duration = MAX_CONFIRMATION_DURATION;
172 let mut confirmation_succeeded = false;
173
174 while confirmation_start.elapsed() < max_duration {
175 let owned_key;
176 let secret_key_ref = if is_public {
177 None
178 } else {
179 owned_key = pad_state.secret_key();
180 Some(&owned_key)
181 };
182 match self
183 .context
184 .base_context
185 .network
186 .get(client, ¤t_pad_address, secret_key_ref)
187 .await
188 {
189 Ok(get_result) => {
190 let checksum_match = pad.checksum == PadInfo::checksum(&get_result.data);
191 let counter_match = pad.last_known_counter == get_result.counter;
192 let size_match = pad.size == get_result.data.len();
193 if checksum_match && counter_match && size_match {
194 pad_state.status = PadStatus::Confirmed;
195
196 match self
197 .context
198 .base_context
199 .index
200 .write()
201 .await
202 .update_pad_status(
203 &self.context.base_context.name,
204 ¤t_pad_address,
205 PadStatus::Confirmed,
206 Some(get_result.counter), ) {
208 Ok(final_pad) => {
209 confirmation_succeeded = true;
210 pad_state = final_pad;
211 break;
212 }
213 Err(e) => {
214 warn!("Worker {} failed to update index status to Confirmed for pad {}: {}. Retrying confirmation...", worker_id, current_pad_address, e);
215 }
216 }
217 }
218 }
219 Err(NetworkError::GetError(ant_networking::GetRecordError::RecordNotFound)) => {
220 debug!(
221 "Worker {} confirming pad {}, not found yet. Retrying...",
222 worker_id, current_pad_address
223 );
224 }
225 Err(e) => {
226 warn!(
227 "Worker {} encountered network error while confirming pad {}: {}. Retrying confirmation...",
228 worker_id, current_pad_address, e
229 );
230 }
231 }
232 tokio::time::sleep(Duration::from_secs(1)).await;
233 }
234
235 if !confirmation_succeeded {
236 error!(
237 "Worker {} failed to confirm pad {} within {:?}. Returning error.",
238 worker_id, current_pad_address, max_duration
239 );
240 return Err((
241 Error::Internal(format!("Confirmation timeout: {}", current_pad_address)),
242 pad_state,
243 ));
244 }
245
246 invoke_put_callback(&self.context.put_callback, PutEvent::PadsConfirmed)
247 .await
248 .map_err(|e| (e, pad_state.clone()))?;
249 }
250
251 Ok((pad_state.chunk_index, ()))
252 }
253}