use std::{collections::BTreeSet, io, path::PathBuf, sync::Arc};
use crate::{
hashseq::parse_hash_seq,
util::{
progress::{IdGenerator, ProgressSender},
BlobFormat, HashAndFormat, RpcError, Tag,
},
Hash,
};
use bao_tree::{blake3, ChunkNum};
use bytes::Bytes;
use futures::{future::BoxFuture, stream::LocalBoxStream, Stream, StreamExt};
use genawaiter::rc::{Co, Gen};
use iroh_io::AsyncSliceReader;
use range_collections::RangeSet2;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncRead, sync::mpsc};
pub use bao_tree;
pub use range_collections;
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum EntryStatus {
Complete,
Partial,
NotFound,
}
pub trait MapEntry<D: Map>: Clone + Send + Sync + 'static {
fn hash(&self) -> blake3::Hash;
fn size(&self) -> u64;
fn is_complete(&self) -> bool;
fn available_ranges(&self) -> BoxFuture<'_, io::Result<RangeSet2<ChunkNum>>>;
fn outboard(&self) -> BoxFuture<'_, io::Result<D::Outboard>>;
fn data_reader(&self) -> BoxFuture<'_, io::Result<D::DataReader>>;
}
pub trait Map: Clone + Send + Sync + 'static {
type Outboard: bao_tree::io::fsm::Outboard;
type DataReader: AsyncSliceReader;
type Entry: MapEntry<Self>;
fn get(&self, hash: &Hash) -> Option<Self::Entry>;
fn contains(&self, hash: &Hash) -> EntryStatus;
}
pub trait PartialMapEntry<D: PartialMap>: MapEntry<D> {
fn outboard_mut(&self) -> BoxFuture<'_, io::Result<D::OutboardMut>>;
fn data_writer(&self) -> BoxFuture<'_, io::Result<D::DataWriter>>;
}
pub trait PartialMap: Map {
type OutboardMut: bao_tree::io::fsm::OutboardMut;
type DataWriter: iroh_io::AsyncSliceWriter;
type PartialEntry: PartialMapEntry<Self>;
fn get_or_create_partial(&self, hash: Hash, size: u64) -> io::Result<Self::PartialEntry>;
fn get_partial(&self, hash: &Hash) -> Option<Self::PartialEntry>;
fn insert_complete(&self, entry: Self::PartialEntry) -> BoxFuture<'_, io::Result<()>>;
}
pub trait ReadableStore: Map {
fn blobs(&self) -> Box<dyn Iterator<Item = Hash> + Send + Sync + 'static>;
fn tags(&self) -> Box<dyn Iterator<Item = (Tag, HashAndFormat)> + Send + Sync + 'static>;
fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static>;
fn validate(&self, tx: mpsc::Sender<ValidateProgress>) -> BoxFuture<'_, anyhow::Result<()>>;
fn partial_blobs(&self) -> Box<dyn Iterator<Item = Hash> + Send + Sync + 'static>;
fn export(
&self,
hash: Hash,
target: PathBuf,
mode: ExportMode,
progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
) -> BoxFuture<'_, io::Result<()>>;
}
pub trait Store: ReadableStore + PartialMap {
fn import_file(
&self,
data: PathBuf,
mode: ImportMode,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> BoxFuture<'_, io::Result<(TempTag, u64)>>;
fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> BoxFuture<'_, io::Result<TempTag>>;
fn import_stream(
&self,
data: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> BoxFuture<'_, io::Result<(TempTag, u64)>>;
fn import_reader(
&self,
data: impl AsyncRead + Send + Unpin + 'static,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> BoxFuture<'_, io::Result<(TempTag, u64)>> {
let stream = tokio_util::io::ReaderStream::new(data);
self.import_stream(stream, format, progress)
}
fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> BoxFuture<'_, io::Result<()>>;
fn create_tag(&self, hash: HashAndFormat) -> BoxFuture<'_, io::Result<Tag>>;
fn temp_tag(&self, value: HashAndFormat) -> TempTag;
fn gc_mark<'a>(
&'a self,
extra_roots: impl IntoIterator<Item = io::Result<HashAndFormat>> + 'a,
) -> LocalBoxStream<'a, GcMarkEvent> {
Gen::new(|co| async move {
if let Err(e) = gc_mark_task(self, extra_roots, &co).await {
co.yield_(GcMarkEvent::Error(e)).await;
}
})
.boxed_local()
}
fn gc_sweep(&self) -> LocalBoxStream<'_, GcSweepEvent> {
let blobs = self.blobs().chain(self.partial_blobs());
Gen::new(|co| async move {
let mut count = 0;
for hash in blobs {
if !self.is_live(&hash) {
if let Err(e) = self.delete(&hash).await {
co.yield_(GcSweepEvent::Error(e.into())).await;
} else {
count += 1;
}
}
}
co.yield_(GcSweepEvent::CustomInfo(format!("deleted {} blobs", count)))
.await;
})
.boxed_local()
}
fn clear_live(&self);
fn add_live(&self, live: impl IntoIterator<Item = Hash>);
fn is_live(&self, hash: &Hash) -> bool;
fn delete(&self, hash: &Hash) -> BoxFuture<'_, io::Result<()>>;
}
pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static {
fn on_clone(&self, inner: &HashAndFormat);
fn on_drop(&self, inner: &HashAndFormat);
}
#[derive(Debug)]
pub struct TempTag {
inner: HashAndFormat,
liveness: Option<Arc<dyn LivenessTracker>>,
}
impl TempTag {
pub fn new(inner: HashAndFormat, liveness: Option<Arc<dyn LivenessTracker>>) -> Self {
if let Some(liveness) = liveness.as_ref() {
liveness.on_clone(&inner);
}
Self { inner, liveness }
}
pub fn inner(&self) -> &HashAndFormat {
&self.inner
}
pub fn hash(&self) -> &Hash {
&self.inner.0
}
pub fn format(&self) -> BlobFormat {
self.inner.1
}
pub fn leak(mut self) {
self.liveness = None;
}
}
impl Clone for TempTag {
fn clone(&self) -> Self {
Self::new(self.inner, self.liveness.clone())
}
}
impl Drop for TempTag {
fn drop(&mut self) {
if let Some(liveness) = self.liveness.as_ref() {
liveness.on_drop(&self.inner);
}
}
}
async fn gc_mark_task<'a>(
store: &'a impl Store,
extra_roots: impl IntoIterator<Item = io::Result<HashAndFormat>> + 'a,
co: &Co<GcMarkEvent>,
) -> anyhow::Result<()> {
macro_rules! info {
($($arg:tt)*) => {
co.yield_(GcMarkEvent::CustomInfo(format!($($arg)*))).await;
};
}
macro_rules! warn {
($($arg:tt)*) => {
co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
};
}
let mut roots = BTreeSet::new();
info!("traversing tags");
for (name, haf) in store.tags() {
info!("adding root {:?} {:?}", name, haf);
roots.insert(haf);
}
info!("traversing temp roots");
for haf in store.temp_tags() {
info!("adding temp pin {:?}", haf);
roots.insert(haf);
}
info!("traversing extra roots");
for haf in extra_roots {
let haf = haf?;
info!("adding extra root {:?}", haf);
roots.insert(haf);
}
let mut current = roots.into_iter().collect::<Vec<_>>();
let mut live: BTreeSet<Hash> = BTreeSet::new();
while !current.is_empty() {
for HashAndFormat(hash, format) in std::mem::take(&mut current) {
if live.insert(hash) && !format.is_raw() {
let Some(entry) = store.get(&hash) else {
warn!("gc: {} not found", hash);
continue;
};
if !entry.is_complete() {
warn!("gc: {} is partial", hash);
continue;
}
let Ok(reader) = entry.data_reader().await else {
warn!("gc: {} creating data reader failed", hash);
continue;
};
let Ok((mut iter, count)) = parse_hash_seq(reader).await else {
warn!("gc: {} parse failed", hash);
continue;
};
info!("parsed collection {} {:?}", hash, count);
loop {
let item = match iter.next().await {
Ok(Some(item)) => item,
Ok(None) => break,
Err(_err) => {
warn!("gc: {} parse failed", hash);
break;
}
};
live.insert(item);
}
}
}
}
info!("gc mark done. found {} live blobs", live.len());
store.add_live(live);
Ok(())
}
#[derive(Debug)]
pub enum GcMarkEvent {
CustomInfo(String),
CustomWarning(String, Option<anyhow::Error>),
Error(anyhow::Error),
}
#[derive(Debug)]
pub enum GcSweepEvent {
CustomInfo(String),
CustomWarning(String, Option<anyhow::Error>),
Error(anyhow::Error),
}
#[allow(missing_docs)]
#[derive(Debug)]
pub enum ImportProgress {
Found { id: u64, name: String },
CopyProgress { id: u64, offset: u64 },
Size { id: u64, size: u64 },
OutboardProgress { id: u64, offset: u64 },
OutboardDone { id: u64, hash: Hash },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ImportMode {
#[default]
Copy,
TryReference,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ExportMode {
#[default]
Copy,
TryReference,
}
#[allow(missing_docs)]
#[derive(Debug)]
pub enum ExportProgress {
Start {
id: u64,
hash: Hash,
path: PathBuf,
stable: bool,
},
Progress { id: u64, offset: u64 },
Done { id: u64 },
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ValidateProgress {
Starting {
total: u64,
},
Entry {
id: u64,
hash: Hash,
path: Option<String>,
size: u64,
},
Progress {
id: u64,
offset: u64,
},
Done {
id: u64,
error: Option<String>,
},
AllDone,
Abort(RpcError),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
GcStarted,
GcCompleted,
}