use std::fs::{self, File, OpenOptions};
use std::io::{self, BufWriter, Write};
use std::sync::atomic::{AtomicU64, Ordering};
use fault_injection::{fallible, maybe};
use crate::{
hash, write_trailer, DiskLocation, Map, Marble, Metadata, ObjectId, RelativeDiskLocation,
HEADER_LEN,
};
const HEAP_DIR_SUFFIX: &str = "heap";
const NEW_WRITE_GENERATION: u8 = 0;
impl Marble {
#[doc(alias = "insert")]
#[doc(alias = "set")]
#[doc(alias = "put")]
pub fn write_batch<B, I>(&self, write_batch: I) -> io::Result<()>
where
B: AsRef<[u8]>,
I: IntoIterator<Item = (ObjectId, Option<B>)>,
{
let old_locations = Map::default();
self.shard_batch(write_batch, NEW_WRITE_GENERATION, &old_locations)
}
pub(crate) fn shard_batch<B, I>(
&self,
write_batch: I,
generation: u8,
old_locations: &Map<ObjectId, DiskLocation>,
) -> io::Result<()>
where
B: AsRef<[u8]>,
I: IntoIterator<Item = (ObjectId, Option<B>)>,
{
let mut shards: Map<u8, (usize, Map<ObjectId, Option<B>>)> = Map::default();
let mut fragmented_shards = vec![];
let mut high_level_user_bytes_written = 0;
let mut max_oid = 0;
for (object_id, data_opt) in write_batch {
max_oid = max_oid.max(object_id);
let (object_size, shard_id) = if let Some(ref data) = data_opt {
let len = data.as_ref().len();
if old_locations.is_empty() {
high_level_user_bytes_written += len as u64;
}
let shard = if generation == NEW_WRITE_GENERATION {
0
} else {
(self.config.partition_function)(object_id, len)
};
(len + HEADER_LEN, shard)
} else {
(0, 0)
};
let shard = shards.entry(shard_id).or_default();
let is_rewrite = generation > NEW_WRITE_GENERATION;
let over_size_preference = shard.0 > self.config.target_file_size;
if is_rewrite && over_size_preference {
fragmented_shards.push((shard_id, std::mem::take(&mut shard.1)));
shard.0 = 0;
}
shard.0 += object_size;
if let Some(Some(replaced)) = shard.1.insert(object_id, data_opt) {
shard.0 -= replaced.as_ref().len();
}
}
self.high_level_user_bytes_written
.fetch_add(high_level_user_bytes_written, Ordering::Relaxed);
self.max_object_id.fetch_max(max_oid, Ordering::Release);
let iter = shards
.into_iter()
.map(|(_shard, (_sz, objects))| objects)
.chain(
fragmented_shards
.into_iter()
.map(|(_shard, objects)| objects),
);
for objects in iter {
self.write_batch_inner(objects, generation, &old_locations)?;
}
if self.config.fsync_each_batch {
fallible!(self.directory_lock.sync_all());
}
Ok(())
}
fn write_batch_inner<B>(
&self,
objects: Map<ObjectId, Option<B>>,
generation: u8,
old_locations: &Map<ObjectId, DiskLocation>,
) -> io::Result<()>
where
B: AsRef<[u8]>,
{
static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
assert!(!objects.is_empty());
let is_gc = if generation == NEW_WRITE_GENERATION {
assert!(old_locations.is_empty());
false
} else {
assert!(!old_locations.is_empty());
true
};
let tmp_file_name = format!("{}-tmp", TMP_COUNTER.fetch_add(1, Ordering::SeqCst));
let tmp_path = self.config.path.join(HEAP_DIR_SUFFIX).join(tmp_file_name);
let mut file_options = OpenOptions::new();
file_options.read(true).write(true).create(true);
let file = fallible!(file_options.open(&tmp_path));
let mut buf_writer = BufWriter::with_capacity(8 * 1024 * 1024, file);
let mut new_relative_locations: Map<ObjectId, RelativeDiskLocation> = Map::default();
let mut written_bytes: u64 = 0;
for (object_id, raw_object_opt) in &objects {
let raw_object = if let Some(raw_object) = raw_object_opt {
raw_object.as_ref()
} else {
let is_delete = true;
new_relative_locations.insert(*object_id, RelativeDiskLocation::new(0, is_delete));
continue;
};
if raw_object.len() > self.config.max_object_size {
return Err(io::Error::new(
io::ErrorKind::Unsupported,
format!(
"{:?} in write batch has a size of {}, which is larger than the \
configured `max_object_size` of {}. If this is intentional, please \
increase the configured `max_object_size`.",
object_id,
raw_object.len(),
self.config.max_object_size,
),
));
}
let relative_address = written_bytes;
let is_delete = false;
let relative_location = RelativeDiskLocation::new(relative_address, is_delete);
new_relative_locations.insert(*object_id, relative_location);
let len_buf: [u8; 8] = (raw_object.len() as u64).to_le_bytes();
let pid_buf: [u8; 8] = object_id.to_le_bytes();
let crc = hash(len_buf, pid_buf, &raw_object);
log::trace!(
"writing object {} at offset {} with crc {:?}",
object_id,
written_bytes,
crc
);
fallible!(buf_writer.write_all(&crc));
fallible!(buf_writer.write_all(&pid_buf));
fallible!(buf_writer.write_all(&len_buf));
fallible!(buf_writer.write_all(&raw_object));
written_bytes += (HEADER_LEN + raw_object.len()) as u64;
}
assert_eq!(new_relative_locations.len(), objects.len());
fallible!(buf_writer.flush());
let file: File = buf_writer
.into_inner()
.expect("BufWriter::into_inner should not fail after an explicit flush");
let mut file_2: File = fallible!(file.try_clone());
if self.config.fsync_each_batch {
fallible!(file.sync_all());
}
self.bytes_written
.fetch_add(written_bytes, Ordering::Relaxed);
let initial_capacity = new_relative_locations.len() as u64;
let (base_location, fam_claim) = self.file_map.insert(
file,
written_bytes,
initial_capacity,
generation,
is_gc,
&self.config,
);
let mut replaced_locations: Vec<(ObjectId, DiskLocation)> = vec![];
let mut failed_gc_locations = vec![];
let mut subtract_from_len = 0;
for (object_id, new_relative_location) in &new_relative_locations {
#[cfg(feature = "runtime_validation")]
let mut debug_history = self.debug_history.lock().unwrap();
let new_location = new_relative_location.to_absolute(base_location.lsn());
if let Some(old_location) = old_locations.get(&object_id) {
let res = self
.location_table
.cas(*object_id, *old_location, new_location);
match res {
Ok(()) => {
log::trace!(
"cas of {object_id} from old location {old_location:?} to new \
location {new_location:?} successful"
);
#[cfg(feature = "runtime_validation")]
{
debug_history.mark_add(*object_id, new_location);
debug_history.mark_remove(*object_id, *old_location);
}
replaced_locations.push((*object_id, *old_location));
}
Err(_current_opt) => {
log::trace!(
"cas of {object_id} from old location {old_location:?} to new \
location {new_location:?} failed"
);
failed_gc_locations.push(*object_id);
subtract_from_len += 1;
}
}
} else {
let res = self.location_table.fetch_max(*object_id, new_location);
if let Ok(old_opt) = res {
log::trace!(
"fetch_max of {object_id} to new location {new_location:?} successful"
);
#[cfg(feature = "runtime_validation")]
debug_history.mark_add(*object_id, new_location);
if let Some(old) = old_opt {
replaced_locations.push((*object_id, old));
#[cfg(feature = "runtime_validation")]
debug_history.mark_remove(*object_id, old);
}
} else {
log::trace!("fetch_max of {object_id} to new location {new_location:?} failed");
subtract_from_len += 1;
}
};
}
for failed_gc_location in &failed_gc_locations {
new_relative_locations.remove(failed_gc_location).unwrap();
}
let trailer_items = new_relative_locations.len();
if trailer_items == 0 {
self.file_map
.delete_partially_installed_fam(base_location, tmp_path);
return Ok(());
}
let expected_file_len = written_bytes + 4 + 8 + (16 * new_relative_locations.len() as u64);
let metadata = Metadata {
lsn: base_location.lsn(),
trailer_offset: written_bytes,
present_objects: objects.len() as u64,
generation,
file_size: expected_file_len,
};
let file_name = metadata.to_file_name();
let new_path = self.config.path.join(HEAP_DIR_SUFFIX).join(file_name);
log::trace!(
"writing trailer for {} at offset {}, trailer items {trailer_items}",
base_location.lsn(),
written_bytes,
);
let res = write_trailer(&mut file_2, written_bytes, &new_relative_locations)
.and_then(|_| maybe!(file_2.sync_all()))
.and_then(|_| maybe!(fs::rename(&tmp_path, &new_path)));
assert_eq!(trailer_items, new_relative_locations.len());
if let Err(e) = res {
for (object_id, old_location) in replaced_locations {
let new_relative_location = new_relative_locations.get(&object_id).unwrap();
let new_location = new_relative_location.to_absolute(base_location.lsn());
let _dont_care = self
.location_table
.cas(object_id, new_location, old_location);
}
self.file_map
.delete_partially_installed_fam(base_location, tmp_path);
log::error!("failed to write new file: {:?}", e);
return Err(e);
};
let file_len = fallible!(file_2.metadata()).len();
assert_eq!(file_len, expected_file_len);
log::trace!("renamed file to {:?}", new_path);
self.file_map
.decrement_evacuated_fams(base_location, replaced_locations);
self.file_map
.finalize_fam(base_location, metadata, subtract_from_len, new_path);
drop(fam_claim);
Ok(())
}
}