use crate::support::*;
#[derive(Clone)]
pub struct Processes {
pub(crate) core: LashCore,
}
impl Processes {
fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
self.core
.env
.process_registry
.as_ref()
.cloned()
.ok_or_else(|| {
EmbedError::Plugin(lash_core::PluginError::Session(
"process registry is unavailable in this runtime".to_string(),
))
})
}
fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
}
fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
let effect_id = command.effect_id();
lash_core::RuntimeInvocation::effect(
lash_core::runtime::RuntimeScope::new("runtime"),
effect_id.clone(),
lash_core::RuntimeEffectKind::Process,
effect_id,
)
}
async fn run_command(
&self,
command: lash_core::ProcessCommand,
scoped_effect_controller: ScopedEffectController<'_>,
) -> Result<lash_core::ProcessEffectOutcome> {
let registry = self.registry()?;
let invocation = Self::process_invocation(&command);
let outcome = scoped_effect_controller
.controller()
.execute_effect(
lash_core::RuntimeEffectEnvelope::new(
invocation,
lash_core::RuntimeEffectCommand::process(command),
),
lash_core::RuntimeEffectLocalExecutor::processes(
registry,
self.core.env.process_work_driver.clone(),
),
)
.await
.map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
match outcome {
lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
_ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
"process effect returned non-process outcome".to_string(),
))),
}
}
pub async fn start(
&self,
request: lash_core::ProcessStartRequest,
scoped_effect_controller: ScopedEffectController<'_>,
) -> Result<lash_core::ProcessRecord> {
let env_ref = match request.env_spec.as_ref() {
Some(env_spec) => Some(
lash_core::runtime::persist_process_execution_env(
self.core.env.core.durability.process_env_store.as_ref(),
env_spec,
)
.await?,
),
None => None,
};
let grant = request.grant.clone();
let registration = request.into_registration(env_ref);
let command = lash_core::ProcessCommand::Start {
registration,
grant,
execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
};
let outcome = self
.run_command(command, scoped_effect_controller.clone())
.await?;
let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
return Err(EmbedError::Plugin(lash_core::PluginError::Session(
"process start returned the wrong outcome".to_string(),
)));
};
if let Some(driver) = self.core.work_driver.drivers().await.process {
driver.claim_and_run_pending("admin_process_start").await?;
}
Ok(*record)
}
pub async fn list(
&self,
filter: &lash_core::ProcessListFilter,
) -> Result<Vec<lash_core::ObservedProcess>> {
self.make_observer()?.list(filter).await.map_err(Into::into)
}
pub async fn list_granted_to(
&self,
session_scope: &lash_core::SessionScope,
filter: &lash_core::ProcessListFilter,
) -> Result<Vec<lash_core::ObservedProcess>> {
self.make_observer()?
.list_granted_to(session_scope, filter)
.await
.map_err(Into::into)
}
pub async fn list_originated_by(
&self,
session_scope: &lash_core::SessionScope,
filter: &lash_core::ProcessListFilter,
) -> Result<Vec<lash_core::ObservedProcess>> {
self.make_observer()?
.list_originated_by(session_scope, filter)
.await
.map_err(Into::into)
}
pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
Ok(self.make_observer()?.process(process_id).await)
}
pub async fn events(
&self,
process_id: &str,
after_sequence: u64,
) -> Result<Vec<lash_core::ObservedProcessEvent>> {
self.make_observer()?
.events_after(process_id, after_sequence)
.await
.map_err(Into::into)
}
pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
if let Some(driver) = self.core.env.process_work_driver.as_ref() {
return driver.await_terminal(process_id).await.map_err(Into::into);
}
lash_core::ProcessAwaiter::polling(self.registry()?)
.await_terminal(process_id)
.await
.map_err(Into::into)
}
pub async fn cancel(
&self,
process_id: &str,
scoped_effect_controller: ScopedEffectController<'_>,
) -> Result<lash_core::ProcessCancelSummary> {
let command = lash_core::ProcessCommand::Cancel {
process_id: process_id.to_string(),
reason: Some("requested by host".to_string()),
};
let outcome = self
.run_command(command, scoped_effect_controller.clone())
.await?;
let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
return Err(EmbedError::Plugin(lash_core::PluginError::Session(
"process cancel returned the wrong outcome".to_string(),
)));
};
Ok(lash_core::ProcessCancelSummary::from_record(*record))
}
pub async fn signal(
&self,
process_id: &str,
signal_name: impl Into<String>,
signal_id: impl Into<String>,
request: lash_core::ProcessEventAppendRequest,
scoped_effect_controller: ScopedEffectController<'_>,
) -> Result<lash_core::ProcessEvent> {
let signal_name = signal_name.into();
let event_type = request.event_type.clone();
let payload = request.payload.clone();
let command = lash_core::ProcessCommand::Signal {
process_id: process_id.to_string(),
signal_name: signal_name.clone(),
signal_id: signal_id.into(),
request,
};
let outcome = self
.run_command(command, scoped_effect_controller.clone())
.await?;
let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
return Err(EmbedError::Plugin(lash_core::PluginError::Session(
"process signal returned the wrong outcome".to_string(),
)));
};
let registry = self.registry()?;
let waiting_ordinal =
registry
.get_process(process_id)
.await
.and_then(|record| match record.wait {
Some(lash_core::WaitState {
kind:
lash_core::WaitKind::Signal {
name,
event_type: wait_event_type,
ordinal,
..
},
..
}) if name == signal_name && wait_event_type == event_type => Some(ordinal),
_ => None,
});
let ordinal = match waiting_ordinal {
Some(ordinal) => ordinal,
None => {
registry
.count_events_through(process_id, &event_type, event.sequence)
.await?
}
};
if ordinal > 0 {
let key = scoped_effect_controller
.controller()
.await_event_key(
&lash_core::ExecutionScope::process(process_id),
lash_core::AwaitEventWaitIdentity::process_signal(
process_id,
&signal_name,
ordinal,
),
)
.await
.map_err(|err| {
EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
})?;
let _ = scoped_effect_controller
.controller()
.resolve_await_event(&key, lash_core::Resolution::Ok(payload))
.await
.map_err(|err| {
EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
})?;
}
Ok(*event)
}
pub async fn session_snapshot(
&self,
session_id: impl Into<String>,
) -> Result<lash_core::ProcessWorkSnapshot> {
self.make_observer()?
.snapshot_for_session(session_id)
.await
.map_err(Into::into)
}
pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
self.make_observer()
}
pub async fn cancel_all(
&self,
scoped_effect_controller: ScopedEffectController<'_>,
) -> Result<Vec<lash_core::ProcessCancelSummary>> {
let running = self
.list(&lash_core::ProcessListFilter {
status: lash_core::ProcessStatusFilter::Running,
..lash_core::ProcessListFilter::default()
})
.await?;
let mut summaries = Vec::with_capacity(running.len());
for process in running {
summaries.push(
self.cancel(&process.process_id, scoped_effect_controller.clone())
.await?,
);
}
Ok(summaries)
}
pub async fn transfer(
&self,
from_scope: &lash_core::SessionScope,
to_scope: &lash_core::SessionScope,
process_ids: &[String],
) -> Result<()> {
self.registry()?
.transfer_handle_grants(from_scope, to_scope, process_ids)
.await
.map_err(Into::into)
}
pub async fn prune(&self, cutoff_epoch_ms: u64) -> Result<lash_core::ProcessPruneReport> {
self.registry()?
.prune_terminal_processes(cutoff_epoch_ms)
.await
.map_err(Into::into)
}
pub async fn request_abandon(
&self,
process_id: &str,
requested_by: impl Into<String>,
reason: Option<String>,
) -> Result<lash_core::ObservedProcess> {
let request = lash_core::AbandonRequest {
requested_by: requested_by.into(),
requested_at_ms: now_epoch_ms(),
reason,
};
self.registry()?
.request_process_abandon(process_id, request)
.await?;
self.get(process_id).await?.ok_or_else(|| {
EmbedError::Plugin(lash_core::PluginError::Session(format!(
"process `{process_id}` vanished after recording its abandon request"
)))
})
}
}
pub(crate) fn now_epoch_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|elapsed| elapsed.as_millis() as u64)
.unwrap_or(0)
}