use std::collections::HashMap;
use std::str::FromStr;
use scru128::Scru128Id;
use crate::processor::actor::Actor;
use crate::processor::lifecycle::{Event, Slots, ThresholdPick};
use crate::processor::{Lifecycle, LifecycleReader};
use crate::store::{FollowOption, Frame, ReadOptions, Store};
enum StartOutcome {
Spawned,
Invalid,
}
async fn try_start_actor(
frame: &Frame,
store: &Store,
name: &str,
) -> Result<StartOutcome, Box<dyn std::error::Error + Send + Sync>> {
match Actor::from_frame(frame, store).await {
Ok(actor) => {
actor.spawn(store.clone()).await?;
Ok(StartOutcome::Spawned)
}
Err(err) => {
let _ = store.append(
Frame::builder(format!("xs.actor.{name}.invalid"))
.meta(serde_json::json!({
"actor_id": frame.id.to_string(),
"error": err.to_string(),
}))
.build(),
);
Ok(StartOutcome::Invalid)
}
}
}
fn event_from_frame(frame: &Frame) -> Option<(String, Event)> {
let rest = frame.topic.strip_prefix("xs.actor.")?;
let (name, ev_tag) = split_actor_event(rest)?;
let event = match ev_tag {
"create" => Event::Create { id: frame.id },
"term" => Event::Term,
"active" => Event::Active {
source: source_id(frame)?,
},
"invalid" => Event::Invalid {
source: source_id(frame)?,
},
"fin.term" | "fin.error" | "fin.ok" => Event::Fin,
"replaced" => Event::Replaced,
"stopped" => Event::Stopped,
_ => return None,
};
Some((name.to_string(), event))
}
fn split_actor_event(rest: &str) -> Option<(&str, &str)> {
for tag in ["fin.term", "fin.error", "fin.ok"] {
if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
return Some((name, tag));
}
}
for tag in ["create", "term", "active", "invalid", "replaced", "stopped"] {
if let Some(name) = rest.strip_suffix(&format!(".{tag}")) {
return Some((name, tag));
}
}
None
}
fn source_id(frame: &Frame) -> Option<Scru128Id> {
let meta = frame.meta.as_ref()?;
let s = meta.get("actor_id").and_then(|v| v.as_str())?;
Scru128Id::from_str(s).ok()
}
#[derive(Default)]
struct TopicState {
slots: Slots,
frames: HashMap<Scru128Id, Frame>,
}
async fn execute_pick(
pick: ThresholdPick,
state: &TopicState,
topic: &str,
store: &Store,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (id, fallback) = match pick {
ThresholdPick::None => return Ok(()),
ThresholdPick::Start { id, fallback } => (id, fallback),
};
let Some(frame) = state.frames.get(&id) else {
return Ok(()); };
let outcome = try_start_actor(frame, store, topic).await?;
if matches!(outcome, StartOutcome::Invalid) {
if let Some(fb_id) = fallback {
if let Some(fb_frame) = state.frames.get(&fb_id) {
let _ = try_start_actor(fb_frame, store, topic).await?;
}
}
}
Ok(())
}
pub async fn run(store: Store) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let rx = store
.read(ReadOptions::builder().follow(FollowOption::On).build())
.await;
let mut lifecycle = LifecycleReader::new(rx);
let mut states: HashMap<String, TopicState> = HashMap::new();
while let Some(event) = lifecycle.recv().await {
match event {
Lifecycle::Historical(frame) => {
if let Some((topic, ev)) = event_from_frame(&frame) {
let state = states.entry(topic).or_default();
if let Event::Create { id } = &ev {
state.frames.insert(*id, frame.clone());
}
state.slots.apply(ev);
}
}
Lifecycle::Threshold(_) => {
let mut picks: Vec<(String, ThresholdPick)> = states
.iter()
.map(|(t, s)| (t.clone(), s.slots.threshold()))
.collect();
picks.sort_by_key(|(_, p)| match p {
ThresholdPick::Start { id, .. } => Some(*id),
ThresholdPick::None => None,
});
for (topic, pick) in picks {
if let Some(state) = states.get(&topic) {
execute_pick(pick, state, &topic, &store).await?;
}
}
}
Lifecycle::Live(frame) => {
if let Some((topic, ev)) = event_from_frame(&frame) {
let is_create = matches!(ev, Event::Create { .. });
let prior_confirmed = states.get(&topic).and_then(|s| s.slots.confirmed());
let state = states.entry(topic.clone()).or_default();
if let Event::Create { id } = &ev {
state.frames.insert(*id, frame.clone());
}
state.slots.apply(ev);
if is_create {
let outcome = try_start_actor(&frame, &store, &topic).await?;
if matches!(outcome, StartOutcome::Invalid) {
if let Some(fb_id) = prior_confirmed {
if let Some(fb_frame) = states
.get(&topic)
.and_then(|s| s.frames.get(&fb_id).cloned())
{
let _ = try_start_actor(&fb_frame, &store, &topic).await?;
}
}
}
}
}
}
}
}
Ok(())
}