use crate::registry::cosign::{CosignVerifier, SignatureStatus};
use crate::registry::types::{
media_types, PluginMetadataJson, PluginReference, RegistryAuth, RegistryConfig,
};
use anyhow::{bail, Context, Result};
use log::{info, warn};
use oci_client::client::{ClientConfig, ClientProtocol};
use oci_client::Reference;
use std::path::{Path, PathBuf};
#[derive(Debug)]
pub struct DownloadResult {
pub path: PathBuf,
pub verification: SignatureStatus,
}
const PLUGIN_DIRECTORY_PACKAGE: &str = "drasi-plugin-directory";
pub struct OciRegistryClient {
client: oci_client::Client,
config: RegistryConfig,
verifier: CosignVerifier,
}
impl OciRegistryClient {
pub fn new(config: RegistryConfig) -> Self {
let client_config = ClientConfig {
protocol: ClientProtocol::Https,
..Default::default()
};
Self {
client: oci_client::Client::new(client_config),
config,
verifier: CosignVerifier::new(Default::default()),
}
}
pub fn with_verifier(config: RegistryConfig, verifier: CosignVerifier) -> Self {
let client_config = ClientConfig {
protocol: ClientProtocol::Https,
..Default::default()
};
Self {
client: oci_client::Client::new(client_config),
config,
verifier,
}
}
pub fn auth(&self) -> oci_client::secrets::RegistryAuth {
match &self.config.auth {
RegistryAuth::Anonymous => oci_client::secrets::RegistryAuth::Anonymous,
RegistryAuth::Basic { username, password } => {
oci_client::secrets::RegistryAuth::Basic(username.clone(), password.clone())
}
}
}
pub async fn list_tags(&self, reference: &str) -> Result<Vec<String>> {
let parsed = PluginReference::parse(reference, &self.config.default_registry)?;
let oci_ref: Reference = parsed
.to_oci_reference()
.parse()
.context("invalid OCI reference")?;
self.list_tags_all(&oci_ref).await
}
async fn list_tags_all(&self, oci_ref: &Reference) -> Result<Vec<String>> {
const PAGE_SIZE: usize = 1000;
const MAX_TAGS: usize = 100_000;
let mut all_tags = Vec::new();
let mut last: Option<String> = None;
loop {
let response = match self
.client
.list_tags(oci_ref, &self.auth(), Some(PAGE_SIZE), last.as_deref())
.await
{
Ok(resp) => resp,
Err(e) => {
let err_str = format!("{e:#}");
if err_str.contains("invalid type: null") {
warn!(
"Registry returned null tags for {}; treating as empty",
oci_ref
);
break;
}
return Err(e).context("failed to list tags");
}
};
let page = response.tags;
if page.is_empty() {
break;
}
let new_last = page.last().cloned();
if new_last == last {
warn!(
"Registry pagination cursor did not advance for {}; stopping to avoid infinite loop",
oci_ref
);
break;
}
last = new_last;
all_tags.extend(page);
if all_tags.len() >= MAX_TAGS {
warn!(
"Tag listing for {} reached safety cap of {} tags; results may be incomplete",
oci_ref, MAX_TAGS
);
break;
}
}
Ok(all_tags)
}
pub async fn fetch_manifest_annotations(
&self,
reference: &str,
) -> Result<std::collections::BTreeMap<String, String>> {
let oci_ref: Reference = reference.parse().context("invalid OCI reference")?;
let (manifest, _digest) = self
.client
.pull_manifest(&oci_ref, &self.auth())
.await
.context("failed to pull manifest")?;
match manifest {
oci_client::manifest::OciManifest::Image(img) => {
Ok(img.annotations.unwrap_or_default())
}
oci_client::manifest::OciManifest::ImageIndex(_) => {
bail!("expected image manifest, got image index; specify a platform-specific tag")
}
}
}
pub async fn fetch_metadata(&self, reference: &str) -> Result<PluginMetadataJson> {
let oci_ref: Reference = reference.parse().context("invalid OCI reference")?;
let image_data = self
.client
.pull(
&oci_ref,
&self.auth(),
vec![
media_types::PLUGIN_METADATA,
media_types::PLUGIN_BINARY,
media_types::PLUGIN_CONFIG,
],
)
.await
.context("failed to pull artifact")?;
for layer in &image_data.layers {
if layer.media_type == media_types::PLUGIN_METADATA {
let metadata: PluginMetadataJson = serde_json::from_slice(&layer.data)
.context("failed to parse plugin metadata JSON")?;
return Ok(metadata);
}
}
bail!("no metadata layer found in artifact")
}
pub async fn download_plugin(
&self,
reference: &str,
dest_dir: &Path,
filename: &str,
) -> Result<DownloadResult> {
let oci_ref: Reference = reference.parse().context("invalid OCI reference")?;
let verification = self.verifier.verify_plugin(reference, &self.auth()).await;
info!("Downloading plugin from {reference}...");
let image_data = self
.client
.pull(
&oci_ref,
&self.auth(),
vec![
media_types::PLUGIN_METADATA,
media_types::PLUGIN_BINARY,
media_types::PLUGIN_CONFIG,
],
)
.await
.context("failed to pull artifact")?;
for layer in &image_data.layers {
if layer.media_type == media_types::PLUGIN_BINARY {
let dest_path = dest_dir.join(filename);
tokio::fs::create_dir_all(dest_dir)
.await
.context("failed to create destination directory")?;
tokio::fs::write(&dest_path, &layer.data)
.await
.context("failed to write plugin binary")?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let perms = std::fs::Permissions::from_mode(0o755);
std::fs::set_permissions(&dest_path, perms)
.context("failed to set executable permission")?;
}
let size_mb = layer.data.len() as f64 / 1_048_576.0;
info!(
"Downloaded {} ({:.1} MB) → {}",
reference,
size_mb,
dest_path.display()
);
return Ok(DownloadResult {
path: dest_path,
verification,
});
}
}
bail!("no binary layer found in artifact")
}
pub async fn get_digest(&self, reference: &str) -> Result<String> {
let oci_ref: Reference = reference.parse().context("invalid OCI reference")?;
let (_manifest, digest) = self
.client
.pull_manifest(&oci_ref, &self.auth())
.await
.context("failed to pull manifest")?;
Ok(digest)
}
pub fn verifier(&self) -> &CosignVerifier {
&self.verifier
}
pub fn expand_reference(&self, reference: &str) -> Result<String> {
let parsed = PluginReference::parse(reference, &self.config.default_registry)?;
Ok(parsed.to_oci_reference())
}
pub async fn search_plugins(&self, query: &str) -> Result<Vec<PluginSearchResult>> {
use crate::registry::platform::strip_arch_suffix;
let dir_ref = format!(
"{}/{}",
self.config.default_registry, PLUGIN_DIRECTORY_PACKAGE
);
let dir_oci_ref: Reference = dir_ref.parse().context("invalid directory reference")?;
let dir_tags = self
.list_tags_all(&dir_oci_ref)
.await
.context("failed to list plugin directory — directory package may not exist yet")?;
let mut candidates: Vec<(String, String)> = Vec::new();
for tag in &dir_tags {
if let Some((ptype, kind)) = tag.split_once('.') {
let plugin_ref = format!("{}/{}", ptype, kind);
let matches = if query.is_empty() || query == "*" {
true
} else if query.contains('/') {
plugin_ref == query
} else {
kind.contains(query)
};
if matches {
candidates.push((ptype.to_string(), kind.to_string()));
}
}
}
let mut results = Vec::new();
for (ptype, kind) in &candidates {
let plugin_ref = format!("{}/{}", ptype, kind);
match self.list_tags(&plugin_ref).await {
Ok(tags) => {
let mut version_map: std::collections::BTreeMap<String, Vec<String>> =
std::collections::BTreeMap::new();
for tag in &tags {
if let Some((version, suffix)) = strip_arch_suffix(tag) {
version_map
.entry(version.to_string())
.or_default()
.push(suffix.to_string());
}
}
let versions: Vec<PluginVersionInfo> = version_map
.into_iter()
.map(|(version, platforms)| PluginVersionInfo { version, platforms })
.collect();
results.push(PluginSearchResult {
reference: plugin_ref.clone(),
full_reference: self.expand_reference(&plugin_ref).unwrap_or_default(),
versions,
});
}
Err(_) => {
}
}
}
Ok(results)
}
}
#[derive(Debug, Clone)]
pub struct PluginSearchResult {
pub reference: String,
pub full_reference: String,
pub versions: Vec<PluginVersionInfo>,
}
#[derive(Debug, Clone)]
pub struct PluginVersionInfo {
pub version: String,
pub platforms: Vec<String>,
}