use super::{backend_manager::BackendManager, runtime::Runtime, state_store::StateStore};
use crate::util::GuardHandle;
use anyhow::Result;
use chrono::Utc;
use dashmap::DashMap;
use futures_util::{future::join_all, StreamExt};
use plane_common::{
exponential_backoff::ExponentialBackoff,
names::BackendName,
protocol::{BackendAction, BackendEventId, BackendStateMessage},
types::{BackendState, BackendStatus, TerminationReason},
};
use std::{
net::IpAddr,
sync::{Arc, Mutex},
};
use valuable::Valuable;
pub struct Executor {
pub runtime: Arc<Box<dyn Runtime>>,
state_store: Arc<Mutex<StateStore>>,
backends: Arc<DashMap<BackendName, Arc<BackendManager>>>,
ip: IpAddr,
_backend_event_listener: GuardHandle,
}
impl Executor {
pub async fn new(runtime: Arc<Box<dyn Runtime>>, state_store: StateStore, ip: IpAddr) -> Self {
let backends: Arc<DashMap<BackendName, Arc<BackendManager>>> = Arc::default();
let state_store = Arc::new(Mutex::new(state_store));
#[allow(clippy::unwrap_used)]
Self::terminate_preexisting_backends(runtime.clone(), state_store.clone())
.await
.expect("Failed to terminate all preexisting backends! Locks may be violated, Drone aborting startup.");
let backend_event_listener = {
let docker = runtime.clone();
let backends = backends.clone();
GuardHandle::new(async move {
let mut events = docker.events();
while let Some(event) = events.next().await {
if let Some((_, manager)) = backends.remove(&event.backend_id) {
tracing::info!(
backend_id = event.backend_id.as_value(),
exit_code = event.exit_code.unwrap_or(-1),
"Backend terminated.",
);
if let Err(err) = manager.mark_terminated(event.exit_code) {
tracing::error!(?err, "Error marking backend as terminated.");
}
}
}
tracing::info!("Backend event listener stopped.");
})
};
Self {
runtime,
state_store,
backends,
ip,
_backend_event_listener: backend_event_listener,
}
}
async fn terminate_preexisting_backends(
runtime: Arc<Box<dyn Runtime>>,
state_store: Arc<Mutex<StateStore>>,
) -> Result<()> {
let backends = state_store
.lock()
.expect("State store lock poisoned.")
.active_backends()?;
if !backends.is_empty() {
tracing::info!(?backends, "Terminating preexisting backends");
}
let mut tasks = vec![];
for (backend_id, state) in backends {
let runtime = runtime.clone();
let state_store = state_store.clone();
let state = state.clone();
tasks.push(async move {
state_store
.lock()
.expect("State store lock poisoned.")
.register_event(
&backend_id,
&state.to_hard_terminating(TerminationReason::KeyExpired),
Utc::now(),
)
.unwrap_or_else(|_| {
panic!(
"Failed to register backend terminating for backend {:?}",
backend_id
)
});
let mut backoff = ExponentialBackoff::default();
let mut success = false;
for attempt in 1..=10 {
match runtime.terminate(&backend_id, true).await {
Ok(_) => {
success = true;
break;
}
Err(err) => {
tracing::warn!(
?err,
?backend_id,
?attempt,
"Attempt failed to terminate backend"
);
backoff.wait().await;
}
}
}
if !success {
tracing::warn!(
?backend_id,
"Failed to terminate backend after 10 attempts. Marking terminated anyways."
);
}
state_store
.lock()
.expect("State store lock poisoned.")
.register_event(&backend_id, &state.to_terminated(None), Utc::now())
.unwrap_or_else(|_| {
panic!(
"Failed to register backend termination for backend {:?}",
backend_id
)
});
});
}
join_all(tasks).await;
Ok(())
}
pub fn register_listener<F>(&self, listener: F) -> Result<()>
where
F: Fn(BackendStateMessage) + Send + Sync + 'static,
{
self.state_store
.lock()
.expect("State store lock poisoned.")
.register_listener(listener)
}
pub fn ack_event(&self, event_id: BackendEventId) -> Result<()> {
self.state_store
.lock()
.expect("State store lock poisoned.")
.ack_event(event_id)
}
pub async fn apply_action(
&self,
backend_id: &BackendName,
action: &BackendAction,
) -> Result<()> {
match action {
BackendAction::Spawn {
executable,
key,
static_token,
} => {
let callback = {
let state_store = self.state_store.clone();
let backend_id = backend_id.clone();
move |state: &BackendState| {
let timestamp = chrono::Utc::now();
state_store
.lock()
.expect("State store lock poisoned.")
.register_event(&backend_id, state, timestamp)?;
Ok(())
}
};
let manager = BackendManager::new(
backend_id.clone(),
executable.clone(),
BackendState::default(),
self.runtime.clone(),
callback,
self.ip,
key.clone(),
static_token.clone(),
);
tracing::info!(backend_id = backend_id.as_value(), "Inserting backend.");
self.backends.insert(backend_id.clone(), manager);
}
BackendAction::Terminate { kind, reason } => {
tracing::info!("Terminating backend {}.", backend_id);
let manager = {
let Some(manager) = self.backends.get(backend_id) else {
tracing::warn!(backend_id = backend_id.as_value(), "Backend not found when handling terminate action (assumed terminated).");
self.runtime.terminate(backend_id, true).await?;
self.state_store
.lock()
.expect("State store lock poisoned.")
.register_event(
backend_id,
&BackendState::Terminated {
last_status: BackendStatus::Ready, termination: None,
reason: Some(TerminationReason::Lost),
exit_code: None,
},
Utc::now(),
)?;
return Ok(());
};
manager.clone()
};
manager.terminate(*kind, *reason).await;
}
}
Ok(())
}
}