pub mod composite;
pub mod fs;
pub mod oci;
use oci_distribution::client::ImageData;
use oci_distribution::secrets::RegistryAuth;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use async_trait::async_trait;
use oci_distribution::Reference;
use tracing::debug;
use crate::container::PullPolicy;
use crate::pod::Pod;
use crate::store::oci::Client;
#[async_trait]
pub trait Store: Sync {
async fn get(
&self,
image_ref: &Reference,
pull_policy: PullPolicy,
auth: &RegistryAuth,
) -> anyhow::Result<Vec<u8>>;
async fn fetch_pod_modules(
&self,
pod: &Pod,
auth: &crate::secret::RegistryAuthResolver,
) -> anyhow::Result<HashMap<String, Vec<u8>>> {
debug!(
"Fetching all the container modules for pod '{}'",
pod.name()
);
let all_containers = pod.all_containers();
let container_module_futures = all_containers.iter().map(move |container| {
let reference = container
.image()
.expect("Could not parse image.")
.expect("FATAL ERROR: container must have an image");
let pull_policy = container
.effective_pull_policy()
.expect("Could not identify pull policy.");
async move {
let registry_authentication = auth.resolve_registry_auth(&reference).await?;
Ok((
container.name().to_string(),
self.get(&reference, pull_policy, ®istry_authentication)
.await?,
))
}
});
futures::future::join_all(container_module_futures)
.await
.into_iter()
.collect()
}
}
pub struct LocalStore<S: Storer, C: Client> {
storer: Arc<RwLock<S>>,
client: Arc<Mutex<C>>,
}
impl<S: Storer, C: Client> LocalStore<S, C> {
async fn pull(&self, image_ref: &Reference, auth: &RegistryAuth) -> anyhow::Result<()> {
debug!("Pulling image ref '{:?}' from registry", image_ref);
let image_data = self.client.lock().await.pull(image_ref, auth).await?;
self.storer
.write()
.await
.store(image_ref, image_data)
.await?;
Ok(())
}
}
#[async_trait]
impl<S: Storer + Sync + Send, C: Client + Sync + Send> Store for LocalStore<S, C> {
async fn get(
&self,
image_ref: &Reference,
pull_policy: PullPolicy,
auth: &RegistryAuth,
) -> anyhow::Result<Vec<u8>> {
match pull_policy {
PullPolicy::IfNotPresent => {
if !self.storer.read().await.is_present(image_ref).await {
self.pull(image_ref, auth).await?
}
}
PullPolicy::Always => {
let digest = self
.client
.lock()
.await
.fetch_digest(image_ref, auth)
.await?;
let already_got_with_digest = self
.storer
.read()
.await
.is_present_with_digest(image_ref, digest)
.await;
if !already_got_with_digest {
self.pull(image_ref, auth).await?
}
}
PullPolicy::Never => (),
};
self.storer.read().await.get_local(image_ref).await
}
}
#[async_trait]
pub trait Storer {
async fn store(&mut self, image_ref: &Reference, image_data: ImageData) -> anyhow::Result<()>;
async fn get_local(&self, image_ref: &Reference) -> anyhow::Result<Vec<u8>>;
async fn is_present(&self, image_ref: &Reference) -> bool;
async fn is_present_with_digest(&self, image_ref: &Reference, digest: String) -> bool;
}