use crate::error::Error;
use crate::index::PadInfo;
use crate::internal_events::invoke_put_callback;
use crate::network::NetworkError;
use crate::ops::worker::{self, PoolError, WorkerPoolConfig};
use futures::future::BoxFuture;
use log::{debug, error, info, warn};
use mutant_protocol::{PutCallback, PutEvent};
use std::sync::Arc;
use super::context::Context;
use super::context::PutTaskContext;
use super::task::PutTaskProcessor;
pub async fn recycle_put_pad(
context: Context,
error_cause: Error,
pad_to_recycle: PadInfo,
) -> Result<Option<PadInfo>, Error> {
warn!(
"Recycling pad {} for key '{}' due to error: {:?}",
pad_to_recycle.address, context.name, error_cause
);
debug!(
"Pad to recycle: address={}, status={:?}, chunk_index={}, size={}",
pad_to_recycle.address,
pad_to_recycle.status,
pad_to_recycle.chunk_index,
pad_to_recycle.size
);
match context
.index
.write()
.await
.recycle_errored_pad(&context.name, &pad_to_recycle.address)
.await
{
Ok(new_pad) => {
debug!(
"Successfully recycled pad {} -> {} for key '{}', returning to pool. New pad status: {:?}",
pad_to_recycle.address, new_pad.address, context.name, new_pad.status
);
Ok(Some(new_pad))
}
Err(recycle_err) => {
error!(
"Failed to recycle pad {} for key '{}': {}. Skipping this pad.",
pad_to_recycle.address, context.name, recycle_err
);
Ok(None)
}
}
}
pub async fn write_pipeline(
context: Context,
pads: Vec<PadInfo>,
no_verify: bool,
put_callback: Option<PutCallback>,
) -> Result<(), Error> {
let key_name = context.name.clone();
let total_chunks = pads.len();
let initial_written_count = pads
.iter()
.filter(|p| p.status == PadStatus::Written || p.status == PadStatus::Confirmed)
.count();
let initial_confirmed_count = pads
.iter()
.filter(|p| p.status == PadStatus::Confirmed)
.count();
let chunks_to_reserve = pads
.iter()
.filter(|p| p.status == PadStatus::Generated)
.count();
info!(
"Sending Starting event for key '{}': total={}, written={}, confirmed={}, to_reserve={}",
key_name, total_chunks, initial_written_count, initial_confirmed_count, chunks_to_reserve
);
invoke_put_callback(
&put_callback,
PutEvent::Starting {
total_chunks,
initial_written_count,
initial_confirmed_count,
chunks_to_reserve,
},
)
.await
.map_err(|e| Error::Internal(format!("Callback error on Starting event: {:?}", e)))?;
let pads_to_process: Vec<PadInfo> = pads
.into_iter()
.filter(|p| p.status != PadStatus::Confirmed)
.collect();
let initial_process_count = pads_to_process.len();
if initial_process_count == 0 {
info!("All pads for key '{}' already confirmed.", key_name);
invoke_put_callback(&put_callback, PutEvent::Complete)
.await
.map_err(|e| Error::Internal(format!("Callback error: {:?}", e)))?;
return Ok(());
}
let put_task_context = Arc::new(PutTaskContext {
base_context: context.clone(), no_verify: Arc::new(no_verify),
put_callback: put_callback.clone(),
});
let task_processor = PutTaskProcessor::new(put_task_context.clone());
let config = WorkerPoolConfig {
network: context.network.clone(), client_config: crate::network::client::Config::Put, task_processor,
enable_recycling: true, total_items_hint: initial_process_count, };
debug!(
"Created WorkerPoolConfig for PUT with recycling enabled, total_items_hint={}",
initial_process_count
);
let recycle_fn = {
let context_clone = context.clone(); let key_name_for_log = context.name.to_string();
Arc::new(move |error: Error, pad: PadInfo| {
let context_inner = context_clone.clone(); let key_name_inner = key_name_for_log.clone();
info!(
"Creating recycling function for key '{}', pad {}",
key_name_inner, pad.address
);
Box::pin(async move {
info!(
"Executing recycling function for key '{}', pad {}",
key_name_inner, pad.address
);
recycle_put_pad(context_inner, error, pad).await
}) as BoxFuture<'static, Result<Option<PadInfo>, Error>>
})
};
let pool = match worker::build(config, Some(recycle_fn.clone())).await {
Ok(pool) => pool,
Err(e) => {
error!(
"Failed to build worker pool for PUT '{}': {:?}",
key_name, e
);
return match e {
PoolError::ClientAcquisitionError(msg) => {
Err(Error::Network(NetworkError::ClientAccessError(msg)))
}
_ => Err(Error::Internal(format!("Pool build failed: {:?}", e))),
};
}
};
if let Err(e) = pool.send_items(pads_to_process).await {
error!(
"Failed to send initial pads to worker pool for PUT '{}': {:?}",
key_name, e
);
return match e {
PoolError::PoolSetupError(msg) => Err(Error::Internal(msg)),
_ => Err(Error::Internal(format!("Pool send_items failed: {:?}", e))),
};
}
let pool_result = pool.run(Some(recycle_fn)).await;
match pool_result {
Ok(_results) => {
info!("PUT operation seemingly successful for key '{}'. Final state verification might be needed.", key_name);
invoke_put_callback(&put_callback, PutEvent::Complete)
.await
.map_err(|e| Error::Internal(format!("Callback error: {:?}", e)))?;
Ok(())
}
Err(pool_error) => {
error!(
"PUT worker pool failed for key '{}': {:?}",
key_name, pool_error
);
match pool_error {
PoolError::TaskError(task_err) => Err(task_err), PoolError::JoinError(join_err) => Err(Error::Internal(format!(
"Worker task join error: {:?}",
join_err
))),
PoolError::PoolSetupError(msg) => {
Err(Error::Internal(format!("Pool setup error: {}", msg)))
}
PoolError::ClientAcquisitionError(msg) => {
Err(Error::Network(NetworkError::ClientAccessError(msg)))
}
}
}
}
}
use crate::index::PadStatus;