use std::sync::Arc;
use thiserror::Error;
use tokio::sync::broadcast;
use crate::error::RuntimeError;
use crate::lifecycle::manager::LifecycleManager;
use crate::lifecycle::status::LifecycleEvent;
use crate::lifecycle::view::{ResourceStatus, ResourceView, image_label, last_error_from};
use crate::runtime::{ContainerRuntime, LogChunkStream};
#[derive(Debug, Error)]
pub enum LifecycleHandleError {
#[error("resource `{0}` does not exist in the current plan")]
UnknownResource(String),
#[error("operation `{0}` is not supported by this handle yet")]
NotSupported(&'static str),
#[error(transparent)]
Runtime(#[from] RuntimeError),
}
pub trait LifecycleHandle: Send + Sync {
fn list(
&self,
) -> impl std::future::Future<Output = Result<Vec<ResourceView>, LifecycleHandleError>> + Send;
fn get(
&self,
name: &str,
) -> impl std::future::Future<Output = Result<ResourceView, LifecycleHandleError>> + Send;
fn restart(
&self,
name: &str,
) -> impl std::future::Future<Output = Result<(), LifecycleHandleError>> + Send;
fn logs(
&self,
name: &str,
follow: bool,
) -> impl std::future::Future<Output = Result<LogChunkStream, LifecycleHandleError>> + Send;
fn subscribe_events(&self) -> broadcast::Receiver<LifecycleEvent>;
}
pub struct ManagerHandle<R: ContainerRuntime + 'static> {
inner: Arc<LifecycleManager<R>>,
}
impl<R: ContainerRuntime + 'static> Clone for ManagerHandle<R> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<R: ContainerRuntime + 'static> ManagerHandle<R> {
#[must_use]
pub fn new(inner: Arc<LifecycleManager<R>>) -> Self {
Self { inner }
}
#[must_use]
pub fn manager(&self) -> &Arc<LifecycleManager<R>> {
&self.inner
}
}
impl<R: ContainerRuntime + 'static> LifecycleHandle for ManagerHandle<R> {
async fn list(&self) -> Result<Vec<ResourceView>, LifecycleHandleError> {
let plan = self.inner.plan_arc();
let mut out: Vec<ResourceView> = Vec::with_capacity(plan.nodes().len());
for node in plan.nodes() {
let snapshot = self
.inner
.snapshot(&node.name)
.ok_or_else(|| LifecycleHandleError::UnknownResource(node.name.clone()))?;
out.push(ResourceView {
name: node.name.clone(),
kind: node.kind.clone(),
status: ResourceStatus::from(&snapshot.status),
healthy: matches!(
snapshot.status,
crate::lifecycle::status::NodeStatus::Healthy
),
image: image_label(&node.spec.image),
started_at: snapshot.started_at,
last_error: last_error_from(&snapshot.status),
});
}
Ok(out)
}
async fn get(&self, name: &str) -> Result<ResourceView, LifecycleHandleError> {
let plan = self.inner.plan_arc();
let node = plan
.nodes()
.iter()
.find(|n| n.name == name)
.ok_or_else(|| LifecycleHandleError::UnknownResource(name.to_owned()))?;
let snapshot = self
.inner
.snapshot(name)
.ok_or_else(|| LifecycleHandleError::UnknownResource(name.to_owned()))?;
Ok(ResourceView {
name: node.name.clone(),
kind: node.kind.clone(),
status: ResourceStatus::from(&snapshot.status),
healthy: matches!(
snapshot.status,
crate::lifecycle::status::NodeStatus::Healthy
),
image: image_label(&node.spec.image),
started_at: snapshot.started_at,
last_error: last_error_from(&snapshot.status),
})
}
async fn restart(&self, name: &str) -> Result<(), LifecycleHandleError> {
self.inner.restart_one(name).await.map_err(|err| match err {
crate::LifecycleError::ResourceNotFound(name) => {
LifecycleHandleError::UnknownResource(name)
}
crate::LifecycleError::Start { source, .. }
| crate::LifecycleError::Stop { source, .. } => LifecycleHandleError::Runtime(source),
crate::LifecycleError::SpecBuild { source, .. } => {
LifecycleHandleError::Runtime(RuntimeError::InvalidSpec(source.to_string()))
}
other => LifecycleHandleError::Runtime(RuntimeError::InvalidSpec(other.to_string())),
})
}
async fn logs(&self, name: &str, follow: bool) -> Result<LogChunkStream, LifecycleHandleError> {
let plan = self.inner.plan_arc();
if !plan.nodes().iter().any(|n| n.name == name) {
return Err(LifecycleHandleError::UnknownResource(name.to_owned()));
}
let snapshot = self
.inner
.snapshot(name)
.ok_or_else(|| LifecycleHandleError::UnknownResource(name.to_owned()))?;
let container_id = snapshot.container_id.ok_or_else(|| {
LifecycleHandleError::Runtime(RuntimeError::InvalidSpec(format!(
"resource `{name}` is not running"
)))
})?;
let stream = self.inner.runtime_arc().logs(&container_id, follow).await?;
Ok(stream)
}
fn subscribe_events(&self) -> broadcast::Receiver<LifecycleEvent> {
self.inner.subscribe_events()
}
}