use std::borrow::Cow;
use std::sync::Arc;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tracing::{debug, warn};
use crate::jsonio::{read_json, write_json};
use crate::misc::remove_item;
use crate::monitor::Monitor;
use crate::transport::ListDir;
use crate::*;
static INDEX_DIR: &str = "i";
pub mod flags {
use std::borrow::Cow;
pub static DEFAULT: &[Cow<'static, str>] = &[];
pub static SUPPORTED: &[&str] = &[];
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum BandSelectionPolicy {
LatestClosed,
Latest,
Specified(BandId),
}
fn band_version_requirement() -> semver::VersionReq {
semver::VersionReq::parse(&format!("<={}", crate::VERSION)).unwrap()
}
fn band_version_supported(version: &str) -> bool {
semver::Version::parse(version)
.map(|sv| band_version_requirement().matches(&sv))
.unwrap()
}
#[derive(Debug)]
pub struct Band {
band_id: BandId,
transport: Arc<dyn Transport>,
head: Head,
}
#[derive(Debug, Serialize, Deserialize)]
struct Head {
start_time: i64,
band_format_version: Option<String>,
#[serde(default)]
format_flags: Vec<Cow<'static, str>>,
}
#[derive(Debug, Serialize, Deserialize)]
struct Tail {
end_time: i64,
index_hunk_count: Option<u64>,
}
pub struct Info {
pub id: BandId,
pub is_closed: bool,
pub start_time: OffsetDateTime,
pub end_time: Option<OffsetDateTime>,
pub index_hunk_count: Option<u64>,
}
impl Band {
pub fn create(archive: &Archive) -> Result<Band> {
Band::create_with_flags(archive, flags::DEFAULT)
}
pub fn create_with_flags(
archive: &Archive,
format_flags: &[Cow<'static, str>],
) -> Result<Band> {
format_flags
.iter()
.for_each(|f| assert!(flags::SUPPORTED.contains(&f.as_ref()), "unknown flag {f:?}"));
let band_id = archive
.last_band_id()?
.map_or_else(BandId::zero, |b| b.next_sibling());
let transport = archive.transport().sub_transport(&band_id.to_string());
transport.create_dir("")?;
transport.create_dir(INDEX_DIR)?;
let band_format_version = if format_flags.is_empty() {
Some("0.6.3".to_owned())
} else {
Some("23.2.0".to_owned())
};
let head = Head {
start_time: OffsetDateTime::now_utc().unix_timestamp(),
band_format_version,
format_flags: format_flags.into(),
};
write_json(&transport, BAND_HEAD_FILENAME, &head)?;
Ok(Band {
band_id,
head,
transport,
})
}
pub fn close(&self, index_hunk_count: u64) -> Result<()> {
write_json(
&self.transport,
BAND_TAIL_FILENAME,
&Tail {
end_time: OffsetDateTime::now_utc().unix_timestamp(),
index_hunk_count: Some(index_hunk_count),
},
)
.map_err(Error::from)
}
pub fn open(archive: &Archive, band_id: BandId) -> Result<Band> {
let transport = archive.transport().sub_transport(&band_id.to_string());
let head: Head =
read_json(&transport, BAND_HEAD_FILENAME)?.ok_or(Error::BandHeadMissing { band_id })?;
if let Some(version) = &head.band_format_version {
if !band_version_supported(version) {
return Err(Error::UnsupportedBandVersion {
band_id,
version: version.to_owned(),
});
}
} else {
debug!("Old(?) band {band_id} has no format version");
}
let unsupported_flags = head
.format_flags
.iter()
.filter(|f| !flags::SUPPORTED.contains(&f.as_ref()))
.cloned()
.collect_vec();
if !unsupported_flags.is_empty() {
return Err(Error::UnsupportedBandFormatFlags {
band_id,
unsupported_flags,
});
}
Ok(Band {
band_id: band_id.to_owned(),
head,
transport,
})
}
pub fn delete(archive: &Archive, band_id: BandId) -> Result<()> {
archive
.transport()
.remove_dir_all(&band_id.to_string())
.map_err(|err| {
if err.is_not_found() {
Error::BandNotFound { band_id }
} else {
Error::from(err)
}
})
}
pub fn is_closed(&self) -> Result<bool> {
self.transport
.is_file(BAND_TAIL_FILENAME)
.map_err(Error::from)
}
pub fn id(&self) -> BandId {
self.band_id
}
pub fn band_format_version(&self) -> Option<&str> {
self.head.band_format_version.as_deref()
}
pub fn format_flags(&self) -> &[Cow<'static, str>] {
&self.head.format_flags
}
pub fn index_builder(&self) -> IndexWriter {
IndexWriter::new(self.transport.sub_transport(INDEX_DIR))
}
pub fn index(&self) -> IndexRead {
IndexRead::open(self.transport.sub_transport(INDEX_DIR))
}
pub fn get_info(&self) -> Result<Info> {
let tail_option: Option<Tail> = read_json(&self.transport, BAND_TAIL_FILENAME)?;
let start_time =
OffsetDateTime::from_unix_timestamp(self.head.start_time).map_err(|_| {
Error::InvalidMetadata {
details: format!("Invalid band start timestamp {:?}", self.head.start_time),
}
})?;
let end_time = tail_option
.as_ref()
.map(|tail| {
OffsetDateTime::from_unix_timestamp(tail.end_time).map_err(|_| {
Error::InvalidMetadata {
details: format!("Invalid band end timestamp {:?}", tail.end_time),
}
})
})
.transpose()?;
Ok(Info {
id: self.band_id,
is_closed: tail_option.is_some(),
start_time,
end_time,
index_hunk_count: tail_option.as_ref().and_then(|tail| tail.index_hunk_count),
})
}
pub fn validate(&self, monitor: Arc<dyn Monitor>) -> Result<()> {
let ListDir { mut files, dirs } = self.transport.list_dir("")?;
if !files.contains(&BAND_HEAD_FILENAME.to_string()) {
monitor.error(Error::BandHeadMissing {
band_id: self.band_id,
});
}
remove_item(&mut files, &BAND_HEAD_FILENAME);
remove_item(&mut files, &BAND_TAIL_FILENAME);
for unexpected in files {
warn!(path = ?unexpected, "Unexpected file in band directory");
}
for unexpected in dirs.iter().filter(|n| n != &INDEX_DIR) {
warn!(path = ?unexpected, "Unexpected subdirectory in band directory");
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::str::FromStr;
use std::time::Duration;
use serde_json::json;
use crate::test_fixtures::ScratchArchive;
use super::*;
#[test]
fn create_and_reopen_band() {
let af = ScratchArchive::new();
let band = Band::create(&af).unwrap();
let band_dir = af.path().join("b0000");
assert!(band_dir.is_dir());
assert!(band_dir.join("BANDHEAD").is_file());
assert!(!band_dir.join("BANDTAIL").exists());
assert!(band_dir.join("i").is_dir());
assert!(!band.is_closed().unwrap());
band.close(0).unwrap();
assert!(band_dir.join("BANDTAIL").is_file());
assert!(band.is_closed().unwrap());
let band_id = BandId::from_str("b0000").unwrap();
let band2 = Band::open(&af, band_id).expect("failed to re-open band");
assert!(band2.is_closed().unwrap());
let info = band2.get_info().expect("get_info failed");
assert_eq!(info.id.to_string(), "b0000");
assert!(info.is_closed);
assert_eq!(info.index_hunk_count, Some(0));
let dur = info.end_time.expect("info has an end_time") - info.start_time;
assert!(dur < Duration::from_secs(5));
}
#[test]
fn delete_band() {
let af = ScratchArchive::new();
let _band = Band::create(&af).unwrap();
assert!(af.transport().is_file("b0000/BANDHEAD").unwrap());
Band::delete(&af, BandId::new(&[0])).expect("delete band");
assert!(!af.transport().is_file("b0000").unwrap());
assert!(!af.transport().is_file("b0000/BANDHEAD").unwrap());
}
#[test]
fn unsupported_band_version() {
let af = ScratchArchive::new();
fs::create_dir(af.path().join("b0000")).unwrap();
let head = json!({
"start_time": 0,
"band_format_version": "8888.8.8",
});
fs::write(
af.path().join("b0000").join(BAND_HEAD_FILENAME),
head.to_string(),
)
.unwrap();
let e = Band::open(&af, BandId::zero());
let e_str = e.unwrap_err().to_string();
assert!(
e_str.contains("Unsupported band version \"8888.8.8\" in b0000"),
"bad band version: {e_str:#?}"
);
}
}