mutant_lib/ops/put/
task.rs

1use crate::error::Error;
2use crate::index::{PadInfo, PadStatus};
3use crate::internal_events::invoke_put_callback;
4use crate::network::NetworkError;
5use crate::ops::worker::AsyncTask;
6use crate::ops::MAX_CONFIRMATION_DURATION;
7use async_trait::async_trait;
8use log::{debug, error, info, warn};
9use mutant_protocol::PutEvent;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::time::Instant;
13
14use super::super::PAD_RECYCLING_RETRIES;
15use super::context::PutTaskContext;
16
17#[derive(Clone)]
18pub struct PutTaskProcessor {
19    pub context: Arc<PutTaskContext>,
20}
21
22impl PutTaskProcessor {
23    pub fn new(context: Arc<PutTaskContext>) -> Self {
24        Self { context }
25    }
26}
27
28#[async_trait]
29impl AsyncTask<PadInfo, PutTaskContext, autonomi::Client, (), Error> for PutTaskProcessor {
30    type ItemId = usize;
31
32    async fn process(
33        &self,
34        worker_id: usize,
35        client: &autonomi::Client,
36        pad: PadInfo,
37    ) -> Result<(Self::ItemId, ()), (Error, PadInfo)> {
38        let mut pad_state = pad.clone();
39        let current_pad_address = pad_state.address;
40        let initial_status = pad_state.status;
41        let mut put_succeeded = false;
42        let is_public = self.context.base_context.public;
43
44        let should_put = match initial_status {
45            PadStatus::Generated | PadStatus::Free => true,
46            PadStatus::Written => false,
47            PadStatus::Confirmed => {
48                return Ok((pad_state.chunk_index, ()));
49            }
50        };
51
52        if should_put {
53            let chunk_index = pad_state.chunk_index;
54            let range = self
55                .context
56                .base_context
57                .chunk_ranges
58                .get(chunk_index)
59                .ok_or_else(|| {
60                    (
61                        Error::Internal(format!(
62                            "Invalid chunk index {} for key {}",
63                            chunk_index, self.context.base_context.name
64                        )),
65                        pad_state.clone(),
66                    )
67                })?;
68            let chunk_data = self
69                .context
70                .base_context
71                .data
72                .get(range.clone())
73                .ok_or_else(|| {
74                    (
75                        Error::Internal(format!(
76                            "Data range {:?} out of bounds for key {}",
77                            range, self.context.base_context.name
78                        )),
79                        pad_state.clone(),
80                    )
81                })?;
82
83            let max_put_retries = PAD_RECYCLING_RETRIES;
84            let mut last_put_error: Option<Error> = None;
85            for attempt in 1..=max_put_retries {
86                let put_result = self
87                    .context
88                    .base_context
89                    .network
90                    .put(
91                        client,
92                        &pad_state,
93                        chunk_data,
94                        self.context.base_context.encoding,
95                        is_public,
96                    )
97                    .await;
98
99                match put_result {
100                    Ok(_) => {
101                        // Check if this was a Generated pad that needs a PadReserved event
102                        let was_generated = initial_status == PadStatus::Generated;
103
104                        pad_state.status = PadStatus::Written;
105                        match self
106                            .context
107                            .base_context
108                            .index
109                            .write()
110                            .await
111                            .update_pad_status(
112                                &self.context.base_context.name,
113                                &current_pad_address,
114                                PadStatus::Written,
115                                None,
116                            ) {
117                            Ok(updated_pad) => pad_state = updated_pad,
118                            Err(e) => return Err((e, pad_state.clone())),
119                        }
120
121                        // If the pad was in Generated status, send PadReserved event
122                        if was_generated {
123                            info!(
124                                "Worker {} sending PadReserved event for pad {} (chunk {})",
125                                worker_id, current_pad_address, pad_state.chunk_index
126                            );
127                            invoke_put_callback(&self.context.put_callback, PutEvent::PadReserved)
128                                .await
129                                .map_err(|e| (e, pad_state.clone()))?;
130                        }
131
132                        put_succeeded = true;
133                        last_put_error = None;
134                        break;
135                    }
136                    Err(e) => {
137                        warn!(
138                            "Worker {} failed put attempt {}/{} for pad {} (chunk {}): {}. Retrying...",
139                            worker_id, attempt, max_put_retries, current_pad_address, pad_state.chunk_index, e
140                        );
141                        last_put_error = Some(Error::Network(e));
142                        if attempt < max_put_retries {
143                            tokio::time::sleep(Duration::from_secs(1)).await;
144                        }
145                    }
146                }
147            }
148
149            if !put_succeeded {
150                return Err((
151                    last_put_error.unwrap_or_else(|| {
152                        Error::Internal(format!(
153                            "Put failed for pad {} after {} retries with unknown error",
154                            current_pad_address, max_put_retries
155                        ))
156                    }),
157                    pad_state,
158                ));
159            }
160
161            invoke_put_callback(&self.context.put_callback, PutEvent::PadsWritten)
162                .await
163                .map_err(|e| (e, pad_state.clone()))?;
164        } else {
165            put_succeeded = true;
166            pad_state = pad.clone();
167        }
168
169        if put_succeeded && !*self.context.no_verify {
170            let confirmation_start = Instant::now();
171            let max_duration = MAX_CONFIRMATION_DURATION;
172            let mut confirmation_succeeded = false;
173
174            while confirmation_start.elapsed() < max_duration {
175                let owned_key;
176                let secret_key_ref = if is_public {
177                    None
178                } else {
179                    owned_key = pad_state.secret_key();
180                    Some(&owned_key)
181                };
182                match self
183                    .context
184                    .base_context
185                    .network
186                    .get(client, &current_pad_address, secret_key_ref)
187                    .await
188                {
189                    Ok(get_result) => {
190                        let checksum_match = pad.checksum == PadInfo::checksum(&get_result.data);
191                        let counter_match = pad.last_known_counter == get_result.counter;
192                        let size_match = pad.size == get_result.data.len();
193                        if checksum_match && counter_match && size_match {
194                            pad_state.status = PadStatus::Confirmed;
195
196                            match self
197                                .context
198                                .base_context
199                                .index
200                                .write()
201                                .await
202                                .update_pad_status(
203                                    &self.context.base_context.name,
204                                    &current_pad_address,
205                                    PadStatus::Confirmed,
206                                    Some(get_result.counter), // Pass the actual counter from the network
207                                ) {
208                                Ok(final_pad) => {
209                                    confirmation_succeeded = true;
210                                    pad_state = final_pad;
211                                    break;
212                                }
213                                Err(e) => {
214                                    warn!("Worker {} failed to update index status to Confirmed for pad {}: {}. Retrying confirmation...", worker_id, current_pad_address, e);
215                                }
216                            }
217                        }
218                    }
219                    Err(NetworkError::GetError(ant_networking::GetRecordError::RecordNotFound)) => {
220                        debug!(
221                            "Worker {} confirming pad {}, not found yet. Retrying...",
222                            worker_id, current_pad_address
223                        );
224                    }
225                    Err(e) => {
226                        warn!(
227                            "Worker {} encountered network error while confirming pad {}: {}. Retrying confirmation...",
228                            worker_id, current_pad_address, e
229                        );
230                    }
231                }
232                tokio::time::sleep(Duration::from_secs(1)).await;
233            }
234
235            if !confirmation_succeeded {
236                error!(
237                    "Worker {} failed to confirm pad {} within {:?}. Returning error.",
238                    worker_id, current_pad_address, max_duration
239                );
240                return Err((
241                    Error::Internal(format!("Confirmation timeout: {}", current_pad_address)),
242                    pad_state,
243                ));
244            }
245
246            invoke_put_callback(&self.context.put_callback, PutEvent::PadsConfirmed)
247                .await
248                .map_err(|e| (e, pad_state.clone()))?;
249        }
250
251        Ok((pad_state.chunk_index, ()))
252    }
253}