use std::collections::{BTreeMap, hash_map};
#[allow(unused)]
use lz4_flex::{
self, block, compress_prepend_size, decompress, decompress_into, decompress_size_prepended,
};
use crate::os::task::process::MemorySnapshotRegion;
use super::*;
const MEMORY_REGION_RESOLUTION: u64 = 512;
impl JournalEffector {
pub fn save_memory_and_snapshot(
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
guard: &mut MutexGuard<'_, WasiProcessInner>,
trigger: SnapshotTrigger,
) -> anyhow::Result<()> {
let env = ctx.data();
let memory = unsafe { env.memory_view(ctx) };
let mut cur = 0u64;
let mut regions = Vec::<MemorySnapshotRegion>::new();
while cur < memory.data_size() {
let next = ((cur + MEMORY_REGION_RESOLUTION) / MEMORY_REGION_RESOLUTION)
* MEMORY_REGION_RESOLUTION;
let end = memory.data_size().min(next);
let region = cur..end;
regions.push(region.into());
cur = end;
}
let memory = unsafe { env.memory_view(ctx) };
let journal = ctx.data().active_journal()?;
let mut regions_phase2 = BTreeMap::new();
for region in regions.drain(..) {
#[cfg(not(feature = "sys"))]
let data = memory
.copy_range_to_vec(region.into())
.map_err(mem_error_to_wasi)?;
#[cfg(feature = "sys")]
let data = {
let d = unsafe { memory.data_unchecked() };
if region.end > d.len() as u64 {
return Err(anyhow::anyhow!(
"memory access out of bounds ({} vs {})",
region.end,
d.len()
));
}
&d[region.start as usize..region.end as usize]
};
let hash = {
let h: [u8; 32] = blake3::hash(data).into();
u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]])
};
match guard.snapshot_memory_hash.entry(region) {
hash_map::Entry::Occupied(mut val) => {
if *val.get() == hash {
continue;
}
val.insert(hash);
}
hash_map::Entry::Vacant(vacant) => {
vacant.insert(hash);
}
}
regions_phase2.insert(region, ());
}
regions.clear();
let mut last_end = None;
for region in regions_phase2.keys() {
if Some(region.start) == last_end {
regions.last_mut().unwrap().end = region.end;
} else {
regions.push(*region);
}
last_end = Some(region.end);
}
for region in regions {
#[cfg(not(feature = "sys"))]
let compressed_data = compress_prepend_size(
&memory
.copy_range_to_vec(region.into())
.map_err(mem_error_to_wasi)?,
);
#[cfg(feature = "sys")]
let compressed_data = compress_prepend_size(unsafe {
&memory.data_unchecked()[region.start as usize..region.end as usize]
});
journal
.write(JournalEntry::UpdateMemoryRegionV1 {
region: region.into(),
compressed_data: compressed_data.into(),
})
.map_err(map_snapshot_err)?;
}
let when = SystemTime::now();
journal
.write(JournalEntry::SnapshotV1 { when, trigger })
.map_err(map_snapshot_err)?;
journal.flush().map_err(map_snapshot_err)?;
Ok(())
}
pub unsafe fn apply_compressed_memory(
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
region: Range<u64>,
compressed_data: &[u8],
) -> anyhow::Result<()> {
let (env, mut store) = ctx.data_and_store_mut();
let (uncompressed_size, compressed_data) = block::uncompressed_size(compressed_data)
.map_err(|err| anyhow::anyhow!("failed to decompress - {err}"))?;
let memory = unsafe { env.memory() };
memory.grow_at_least(&mut store, region.end + uncompressed_size as u64)?;
let memory = unsafe { env.memory_view(&store) };
#[cfg(not(feature = "sys"))]
{
let decompressed_data = decompress(compressed_data, uncompressed_size)?;
memory
.write(region.start, &decompressed_data)
.map_err(|err| WasiRuntimeError::Runtime(RuntimeError::user(err.into())))?;
let mut decompressed_data = &decompressed_data[..];
let mut offset = region.start;
while offset < region.end {
let next = region.end.min(offset + MEMORY_REGION_RESOLUTION);
let region = offset..next;
offset = next;
let size = region.end - region.start;
let hash = {
let h: [u8; 32] = blake3::hash(&decompressed_data[..size as usize]).into();
u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]])
};
env.process
.inner
.0
.lock()
.unwrap()
.snapshot_memory_hash
.insert(region.into(), hash);
decompressed_data = &decompressed_data[size as usize..];
}
}
#[cfg(feature = "sys")]
unsafe {
let start = region.start as usize;
let end = start + uncompressed_size;
decompress_into(
compressed_data,
&mut memory.data_unchecked_mut()[start..end],
)?;
let data = &memory.data_unchecked();
let mut offset = region.start;
while offset < region.end {
let next = region.end.min(offset + MEMORY_REGION_RESOLUTION);
let region = offset..next;
let hash = {
let h: [u8; 32] = blake3::hash(&data[offset as usize..next as usize]).into();
u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]])
};
env.process
.inner
.0
.lock()
.unwrap()
.snapshot_memory_hash
.insert(region.into(), hash);
offset = next;
}
}
Ok(())
}
}