use crate::Result;
use crate::model::{EndState, Id, LogKind, ModelManager, RunBmc, RunForUpdate, RunStep, TaskBmc, TaskForUpdate};
use crate::runtime::{RtLog, Runtime};
use crate::support::time::now_micro;
use derive_more::From;
#[derive(Debug, From)]
pub struct RtStep<'a> {
runtime: &'a Runtime,
}
impl<'a> RtStep<'a> {
pub(super) fn new(runtime: &'a Runtime) -> Self {
Self { runtime }
}
fn mm(&self) -> &ModelManager {
self.runtime.mm()
}
fn rt_log(&self) -> RtLog<'_> {
RtLog::new(self.runtime)
}
}
impl<'a> RtStep<'a> {
pub async fn step_run_start(&self, run_id: Id) -> Result<Id> {
RunBmc::update(
self.mm(),
run_id,
RunForUpdate {
start: Some(now_micro().into()),
..Default::default()
},
)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::Start), None, Some(LogKind::RunStep))
.await?;
Ok(run_id)
}
pub async fn step_ba_start(&self, run_id: Id) -> Result<()> {
let run_u = RunForUpdate {
ba_start: Some(now_micro().into()),
..Default::default()
};
RunBmc::update(self.mm(), run_id, run_u)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::BaStart), None, Some(LogKind::RunStep))
.await?;
Ok(())
}
pub async fn step_ba_end(&self, run_id: Id) -> Result<()> {
let run_u = RunForUpdate {
ba_end: Some(now_micro().into()),
..Default::default()
};
RunBmc::update(self.mm(), run_id, run_u)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::BaEnd), None, Some(LogKind::RunStep))
.await?;
Ok(())
}
pub async fn step_tasks_start(&self, run_id: Id) -> Result<()> {
let run_u = RunForUpdate {
tasks_start: Some(now_micro().into()),
..Default::default()
};
RunBmc::update(self.mm(), run_id, run_u)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::TasksStart), None, Some(LogKind::RunStep))
.await?;
Ok(())
}
pub async fn step_tasks_end(&self, run_id: Id) -> Result<()> {
let run_u = RunForUpdate {
tasks_end: Some(now_micro().into()),
..Default::default()
};
RunBmc::update(self.mm(), run_id, run_u)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::TasksEnd), None, Some(LogKind::RunStep))
.await?;
Ok(())
}
pub async fn step_task_start(&self, run_id: Id, task_id: Id) -> Result<()> {
let task_u = TaskForUpdate {
start: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(self.mm(), task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskStart),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_schedule(&self, run_id: Id, task_id: Id) -> Result<()> {
self.step_task_start(run_id, task_id).await
}
pub async fn step_task_data_start(&self, run_id: Id, task_id: Id) -> Result<()> {
let task_u = TaskForUpdate {
data_start: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(self.mm(), task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskDataStart),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_data_end(&self, run_id: Id, task_id: Id) -> Result<()> {
let task_u = TaskForUpdate {
data_end: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(self.mm(), task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskDataEnd),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_ai_start(&self, run_id: Id, task_id: Id) -> Result<()> {
let task_u = TaskForUpdate {
ai_start: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(self.mm(), task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskAiStart),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_ai_gen_start(&self, run_id: Id, task_id: Id, prompt_size: i64) -> Result<()> {
let task_u = TaskForUpdate {
ai_gen_start: Some(now_micro().into()),
prompt_size: Some(prompt_size),
..Default::default()
};
TaskBmc::update(self.mm(), task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskAiStart),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_ai_gen_end(&self, run_id: Id, task_id: Id) -> Result<()> {
let task_u = TaskForUpdate {
ai_gen_end: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(self.mm(), task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskAiStart),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_ai_end(&self, run_id: Id, task_id: Id) -> Result<()> {
let task_u = TaskForUpdate {
ai_end: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(self.mm(), task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskAiEnd),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_output_start(&self, run_id: Id, task_id: Id) -> Result<()> {
let task_u = TaskForUpdate {
output_start: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(self.mm(), task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskOutputStart),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_output_end(&self, run_id: Id, task_id: Id) -> Result<()> {
let task_u = TaskForUpdate {
output_end: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(self.mm(), task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskOutputEnd),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_end_ok(&self, run_id: Id, task_id: Id) -> Result<()> {
let mm = self.mm();
let task = TaskBmc::get(mm, task_id)?;
let end_state = if task.end_state.is_none() {
Some(EndState::Ok)
} else {
None
};
let task_u = TaskForUpdate {
end: Some(now_micro().into()),
end_state,
..Default::default()
};
TaskBmc::update(mm, task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskEnd),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_task_end_err(&self, run_id: Id, task_id: Id, err: &crate::Error) -> Result<()> {
let mm = self.mm();
let task = TaskBmc::get(mm, task_id)?;
if task.end_err_id.is_none() {
TaskBmc::set_end_error_no_end(mm, task_id, None, err)?;
}
let task_u = TaskForUpdate {
end: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(mm, task_id, task_u)?;
self.rt_log()
.rec_log_no_msg(
run_id,
Some(task_id),
Some(RunStep::TaskEnd),
None,
Some(LogKind::RunStep),
)
.await?;
Ok(())
}
pub async fn step_aa_start(&self, run_id: Id) -> Result<()> {
let run_u = RunForUpdate {
aa_start: Some(now_micro().into()),
..Default::default()
};
RunBmc::update(self.mm(), run_id, run_u)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::AaStart), None, Some(LogKind::RunStep))
.await?;
Ok(())
}
pub async fn step_aa_end(&self, run_id: Id) -> Result<()> {
let run_u = RunForUpdate {
aa_end: Some(now_micro().into()),
..Default::default()
};
RunBmc::update(self.mm(), run_id, run_u)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::AaEnd), None, Some(LogKind::RunStep))
.await?;
Ok(())
}
pub async fn step_run_end_ok(&self, run_id: Id) -> Result<()> {
let mm = self.mm();
let run = RunBmc::get(mm, run_id)?;
let end_state = if run.end_state.is_none() {
Some(EndState::Ok)
} else {
None
};
let run_u = get_run_u_for_end(mm, run_id, end_state)?;
RunBmc::update(self.mm(), run_id, run_u)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::End), None, Some(LogKind::RunStep))
.await?;
Ok(())
}
pub async fn step_run_end_canceled(&self, run_id: Id) -> Result<()> {
let mm = self.mm();
let run = RunBmc::get(mm, run_id)?;
let end_state = if run.end_state.is_none() {
Some(EndState::Cancel)
} else {
None
};
let run_u = get_run_u_for_end(mm, run_id, end_state)?;
RunBmc::update(self.mm(), run_id, run_u)?;
TaskBmc::cancel_all_not_ended_for_run(mm, run_id)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::End), None, Some(LogKind::RunStep))
.await?;
Ok(())
}
pub async fn step_run_end_err(&self, run_id: Id, err: &crate::Error) -> Result<()> {
let mm = self.mm();
let run = RunBmc::get(mm, run_id)?;
if run.end_err_id.is_none() {
RunBmc::set_end_error(mm, run_id, None, err)?;
}
let run_u = get_run_u_for_end(mm, run_id, None)?;
RunBmc::update(mm, run_id, run_u)?;
TaskBmc::cancel_all_not_ended_for_run(mm, run_id)?;
self.rt_log()
.rec_log_no_msg(run_id, None, Some(RunStep::End), None, Some(LogKind::RunStep))
.await?;
Ok(())
}
}
fn get_run_u_for_end(mm: &ModelManager, run_id: Id, end_state: Option<EndState>) -> Result<RunForUpdate> {
let tasks = TaskBmc::list_for_run(mm, run_id)?;
let mut total_task_us = 0;
for task in tasks {
if let (Some(start), Some(end)) = (task.start, task.end) {
let dur = end.as_i64() - start.as_i64();
if dur > 0 {
total_task_us += dur;
}
}
}
let total_task_ms: i64 = (total_task_us as f64 / 1000.).round() as i64;
let run_u = RunForUpdate {
end: Some(now_micro().into()),
total_task_ms: Some(total_task_ms),
end_state,
..Default::default()
};
Ok(run_u)
}