use anyhow::Context;
use apt_cmd::{
fetch::{EventKind, FetcherExt},
AptGet,
};
use std::{path::Path, sync::Arc};
use tokio_stream::wrappers::ReceiverStream;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
const CONCURRENT_FETCHES: usize = 8;
let path = Path::new("./packages/");
let (fetch_tx, fetch_rx) = tokio::sync::mpsc::channel(CONCURRENT_FETCHES);
let packages = ReceiverStream::new(fetch_rx);
if !path.exists() {
tokio::fs::create_dir_all(path).await.unwrap();
}
let (fetcher, mut events) = async_fetcher::Fetcher::default()
.connections_per_file(4)
.into_package_fetcher()
.concurrent(CONCURRENT_FETCHES)
.fetch(packages, Arc::from(path));
let sender = async move {
let packages = AptGet::new()
.noninteractive()
.fetch_uris(&["full-upgrade"])
.await
.context("failed to spawn apt-get command")?
.context("failed to fetch package URIs from apt-get")?;
for package in packages {
let _ = fetch_tx.send(Arc::new(package)).await;
}
Ok::<(), anyhow::Error>(())
};
let receiver = async move {
while let Some(event) = events.recv().await {
eprintln!("{:#?}\n", event);
if let EventKind::Error(why) = event.kind {
return Err(why).context("package fetching failed");
}
}
eprintln!("finished");
Ok::<(), anyhow::Error>(())
};
let fetcher = async move {
fetcher.await;
Ok(())
};
futures::try_join!(fetcher, sender, receiver)?;
Ok(())
}