use std::collections::HashMap;
use std::sync::Arc;
use super::executor::{ActorHandle, ActorSlot};
use super::resource::{
ResourceCtx, ResourceError, ResourceSnapshot, SnapshotStreamSpec, TransitionAffordance,
};
use super::transition::{ActorSpec, ResourceSpec, StreamKind};
pub(crate) struct Entry {
pub id: String,
pub kind: String,
pub resource_spec: ResourceSpec,
pub handle: ActorHandle,
pub task: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
}
#[derive(Default)]
pub struct ResourceDirectory {
entries: Vec<Arc<Entry>>,
by_id: HashMap<String, usize>,
}
impl ResourceDirectory {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub(crate) fn entries(&self) -> &[Arc<Entry>] {
&self.entries
}
#[allow(dead_code)] pub(crate) fn get_by_id(&self, id: &str) -> Option<Arc<Entry>> {
self.by_id.get(id).map(|i| self.entries[*i].clone())
}
pub(crate) fn contains_id(&self, id: &str) -> bool {
self.by_id.contains_key(id)
}
pub(crate) fn insert(
&mut self,
id: String,
kind: String,
slot: ActorSlot,
resource_spec: ResourceSpec,
) -> Result<(), ResourceError> {
if self.by_id.contains_key(&id) {
return Err(ResourceError::Internal(format!(
"duplicate resource id: {id}"
)));
}
let entry = Arc::new(Entry {
id: id.clone(),
kind,
resource_spec,
handle: slot.handle,
task: tokio::sync::Mutex::new(Some(slot.task)),
});
self.by_id.insert(id, self.entries.len());
self.entries.push(entry);
Ok(())
}
}
impl Entry {
pub(crate) async fn snapshot(
&self,
ctx: ResourceCtx,
node: &str,
) -> Result<ResourceSnapshot, ResourceError> {
let mut snap = self.handle.snapshot(ctx).await?;
snap.id = self.id.clone();
snap.kind = self.kind.clone();
snap.node = node.to_string();
Ok(snap)
}
pub(crate) async fn actor_spec(
&self,
ctx: ResourceCtx,
node: &str,
) -> Result<ActorSpec, ResourceError> {
let transitions: Vec<TransitionAffordance> = match self.snapshot(ctx, node).await {
Ok(snapshot) => snapshot.transitions,
Err(ResourceError::Unavailable(_)) => Vec::new(),
Err(err) => return Err(err),
};
Ok(ActorSpec {
resource: self.resource_spec.clone(),
transitions: transitions
.into_iter()
.map(|affordance| affordance.spec)
.collect(),
})
}
pub(crate) fn unavailable_snapshot(&self, node: &str) -> ResourceSnapshot {
ResourceSnapshot {
id: self.id.clone(),
kind: self.kind.clone(),
name: self.resource_spec.name.clone(),
state: None,
node: node.to_string(),
properties: serde_json::Map::new(),
labels: self.resource_spec.labels.clone(),
transitions: Vec::new(),
streams: self
.resource_spec
.streams
.iter()
.map(|stream| SnapshotStreamSpec {
name: stream.name.clone(),
kind: match stream.kind {
StreamKind::Object => "object".into(),
StreamKind::Binary => "binary".into(),
},
})
.collect(),
revision: None,
metadata: serde_json::Map::new(),
}
}
}