use crate::error::Error;
use crate::index::{PadInfo, PadStatus};
use crate::internal_events::invoke_put_callback;
use crate::network::Network;
use crate::ops::{DATA_ENCODING_PRIVATE_DATA, DATA_ENCODING_PUBLIC_DATA, DATA_ENCODING_PUBLIC_INDEX};
use autonomi::ScratchpadAddress;
use log::info;
use mutant_protocol::{PutCallback, PutEvent, StorageMode};
use std::sync::Arc;
use tokio::sync::RwLock;
use super::context::Context;
use super::pipeline::write_pipeline;
pub async fn update(
index: Arc<RwLock<crate::index::master_index::MasterIndex>>,
network: Arc<Network>,
key_name: &str,
content: Arc<Vec<u8>>,
mode: StorageMode,
public: bool,
no_verify: bool,
put_callback: Option<PutCallback>,
) -> Result<ScratchpadAddress, Error> {
info!("Efficient update for {}", key_name);
let existing_pads = index.read().await.get_pads(key_name);
if existing_pads.is_empty() {
return Err(Error::Internal(format!("Key '{}' not found", key_name)));
}
let chunk_ranges = index.read().await.chunk_data(&content, mode.clone());
let preserved_index_pad = if public && index.read().await.is_public(key_name) {
info!("Preserving public index pad for key {}", key_name);
index.read().await.extract_public_index_pad(key_name)
} else {
None
};
let existing_data_pads_count = existing_pads.len();
let new_data_pads_count = chunk_ranges.len();
info!(
"Update for key '{}': existing pads: {}, new chunks: {}",
key_name, existing_data_pads_count, new_data_pads_count
);
let mut updated_pads = Vec::with_capacity(new_data_pads_count);
let min_count = std::cmp::min(existing_data_pads_count, new_data_pads_count);
for i in 0..min_count {
let pad = &existing_pads[i];
let chunk_range = &chunk_ranges[i];
let chunk_data = &content[chunk_range.clone()];
let chunk_checksum = PadInfo::checksum(chunk_data);
if pad.checksum == chunk_checksum {
info!("Pad {} (chunk {}) has matching checksum, keeping as is", pad.address, i);
updated_pads.push(pad.clone());
} else {
info!("Pad {} (chunk {}) has different checksum, marking for update", pad.address, i);
let mut updated_pad = pad.clone();
updated_pad.status = PadStatus::Free;
updated_pad.checksum = chunk_checksum;
updated_pad.size = chunk_data.len();
updated_pad.last_known_counter += 1;
updated_pads.push(updated_pad);
}
}
if new_data_pads_count > existing_data_pads_count {
info!(
"Need {} additional pads for key '{}'",
new_data_pads_count - existing_data_pads_count,
key_name
);
let additional_chunks = chunk_ranges[existing_data_pads_count..].to_vec();
let start_offset = additional_chunks[0].start;
let end_offset = additional_chunks.last().unwrap().end;
let additional_content = Arc::new(content[start_offset..end_offset].to_vec());
let mut new_chunk_ranges = Vec::new();
for chunk_range in &additional_chunks {
let relative_start = chunk_range.start - start_offset;
let relative_end = chunk_range.end - start_offset;
new_chunk_ranges.push(relative_start..relative_end);
}
let mut additional_pads = index
.write()
.await
.acquire_pads(&additional_content, &new_chunk_ranges)?;
for (i, pad) in additional_pads.iter_mut().enumerate() {
pad.chunk_index = existing_data_pads_count + i;
}
updated_pads.extend(additional_pads);
} else if new_data_pads_count < existing_data_pads_count {
info!(
"Freeing {} excess pads for key '{}'",
existing_data_pads_count - new_data_pads_count,
key_name
);
let excess_pads = existing_pads[new_data_pads_count..].to_vec();
index.write().await.free_pads(excess_pads)?;
}
if public {
if let Some(mut index_pad) = preserved_index_pad {
index_pad.status = PadStatus::Free;
index_pad.last_known_counter += 1;
index.write().await.update_key_with_pads(key_name, updated_pads.clone(), Some(index_pad))?;
} else {
index.write().await.update_key_with_pads(key_name, updated_pads.clone(), None)?;
}
} else {
index.write().await.update_key_with_pads(key_name, updated_pads.clone(), None)?;
}
let address = updated_pads[0].address;
let pads_to_write: Vec<PadInfo> = updated_pads
.iter()
.filter(|p| p.status == PadStatus::Free || p.status == PadStatus::Generated)
.cloned()
.collect();
if !pads_to_write.is_empty() {
info!(
"Writing {} pads for key '{}' (out of {} total)",
pads_to_write.len(),
key_name,
updated_pads.len()
);
let encoding = if public {
DATA_ENCODING_PUBLIC_DATA
} else {
DATA_ENCODING_PRIVATE_DATA
};
let context = Context {
index: index.clone(),
network: network.clone(),
name: Arc::new(key_name.to_string()),
chunk_ranges: Arc::new(chunk_ranges),
data: content.clone(),
public,
encoding,
};
write_pipeline(context, pads_to_write, no_verify, put_callback.clone()).await?;
} else {
info!("No pads need to be written for key '{}'", key_name);
}
if public {
let (index_pad, index_data) = index.write().await.populate_index_pad(key_name)?;
let index_data_bytes: Arc<Vec<u8>> = Arc::new(index_data);
let index_chunk_ranges = Arc::new(vec![0..index_data_bytes.len()]);
let index_pad_context = Context {
index: index.clone(),
network: network.clone(),
name: Arc::new(key_name.to_string()),
chunk_ranges: index_chunk_ranges,
data: index_data_bytes,
public,
encoding: DATA_ENCODING_PUBLIC_INDEX,
};
write_pipeline(
index_pad_context,
vec![index_pad],
no_verify,
put_callback.clone(),
)
.await?;
}
invoke_put_callback(&put_callback, PutEvent::Complete)
.await
.unwrap();
Ok(address)
}
pub async fn resume(
index: Arc<RwLock<crate::index::master_index::MasterIndex>>,
network: Arc<Network>,
name: &str,
data_bytes: Arc<Vec<u8>>,
mode: StorageMode,
public: bool,
no_verify: bool,
put_callback: Option<PutCallback>,
) -> Result<ScratchpadAddress, Error> {
let pads = index.read().await.get_pads(name);
if pads.iter().any(|p| p.size > mode.scratchpad_size()) {
index.write().await.remove_key(name).unwrap();
return first_store(
index,
network,
name,
data_bytes,
mode,
public,
no_verify,
put_callback,
)
.await;
}
let chunk_ranges = index.read().await.chunk_data(&data_bytes, mode.clone());
if pads.len() != chunk_ranges.len() {
info!(
"Resuming key '{}' with data size mismatch. Index has {} pads, current data requires {}. Using efficient update.",
name,
pads.len(),
chunk_ranges.len()
);
return update(
index,
network,
name,
data_bytes,
mode,
public,
no_verify,
put_callback,
)
.await;
}
if index.read().await.verify_checksum(name, &data_bytes, mode.clone()) {
info!("All checksums match for key '{}', no upload needed", name);
invoke_put_callback(&put_callback, PutEvent::Complete)
.await
.unwrap();
return Ok(pads[0].address);
}
let encoding = if public {
DATA_ENCODING_PUBLIC_DATA
} else {
DATA_ENCODING_PRIVATE_DATA
};
let context = Context {
index: index.clone(),
network: network.clone(),
name: Arc::new(name.to_string()),
data: data_bytes.clone(),
chunk_ranges: Arc::new(chunk_ranges),
public,
encoding,
};
let mut pads_to_write = pads.clone();
for pad in &mut pads_to_write {
pad.status = PadStatus::Free;
}
write_pipeline(context, pads_to_write, no_verify, put_callback.clone()).await?;
if public {
let (index_pad, index_data) = index.write().await.populate_index_pad(name)?;
let index_data_bytes: Arc<Vec<u8>> = Arc::new(index_data);
let index_chunk_ranges = Arc::new(vec![0..index_data_bytes.len()]);
let index_pad_context = Context {
index: index.clone(),
network: network.clone(),
name: Arc::new(name.to_string()),
chunk_ranges: index_chunk_ranges,
data: index_data_bytes,
public,
encoding: DATA_ENCODING_PUBLIC_INDEX,
};
write_pipeline(
index_pad_context,
vec![index_pad],
no_verify,
put_callback.clone(),
)
.await?;
}
Ok(pads[0].address)
}
pub async fn first_store(
index: Arc<RwLock<crate::index::master_index::MasterIndex>>,
network: Arc<Network>,
name: &str,
data_bytes: Arc<Vec<u8>>,
mode: StorageMode,
public: bool,
no_verify: bool,
put_callback: Option<PutCallback>,
) -> Result<ScratchpadAddress, Error> {
let (pads, chunk_ranges) = index
.write()
.await
.create_key(name, &data_bytes, mode, public)?;
info!("Created key {} with {} pads", name, pads.len());
let address = pads[0].address;
let encoding = if public {
DATA_ENCODING_PUBLIC_DATA
} else {
DATA_ENCODING_PRIVATE_DATA
};
let context = Context {
index: index.clone(),
network: network.clone(),
name: Arc::new(name.to_string()),
chunk_ranges: Arc::new(chunk_ranges),
data: data_bytes.clone(),
public,
encoding,
};
write_pipeline(context, pads.clone(), no_verify, put_callback.clone()).await?;
if public {
let (index_pad, index_data) = index.write().await.populate_index_pad(name)?;
let index_data_bytes: Arc<Vec<u8>> = Arc::new(index_data);
let index_chunk_ranges = Arc::new(vec![0..index_data_bytes.len()]);
let index_pad_context = Context {
index: index.clone(),
network: network.clone(),
name: Arc::new(name.to_string()), chunk_ranges: index_chunk_ranges,
data: index_data_bytes,
public, encoding: DATA_ENCODING_PUBLIC_INDEX,
};
write_pipeline(
index_pad_context,
vec![index_pad],
no_verify,
put_callback.clone(), )
.await?;
}
invoke_put_callback(&put_callback, PutEvent::Complete)
.await
.unwrap();
Ok(address)
}