use crate::hub::get_hub;
use crate::model::base::{self, DbBmc};
use crate::model::{
EndState, EntityAction, EntityType, EpochUs, Id, Inout, InoutBmc, InoutForCreate, InoutOnlyDisplay, ModelEvent,
ModelManager, RelIds, Result, RunningState, Stage, TypedContent,
};
use crate::support::time::now_micro;
use modql::SqliteFromRow;
use modql::field::{Fields, HasSqliteFields, SqliteField};
use modql::filter::ListOptions;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, Default)]
pub struct AiPrice {
pub cost: f64,
pub cost_cache_write: Option<f64>,
pub cost_cache_saving: Option<f64>,
}
#[derive(Debug, Clone, Fields, SqliteFromRow)]
pub struct Task {
pub id: Id,
pub uid: Uuid,
pub label: Option<String>,
pub ctime: EpochUs,
pub mtime: EpochUs,
pub run_id: Id,
pub idx: Option<i64>,
pub start: Option<EpochUs>,
pub data_start: Option<EpochUs>,
pub data_end: Option<EpochUs>,
pub ai_start: Option<EpochUs>,
pub ai_gen_start: Option<EpochUs>,
pub ai_gen_end: Option<EpochUs>,
pub ai_end: Option<EpochUs>,
pub output_start: Option<EpochUs>,
pub output_end: Option<EpochUs>,
pub end: Option<EpochUs>,
pub end_state: Option<EndState>,
pub end_err_id: Option<Id>,
pub end_skip_reason: Option<String>,
pub prompt_size: Option<i64>,
pub model_ov: Option<String>, pub model_upstream: Option<String>,
pub pricing_model: Option<String>,
pub pricing_input: Option<f64>,
pub pricing_input_cached: Option<f64>,
pub pricing_output: Option<f64>,
pub tk_prompt_total: Option<i64>,
pub tk_prompt_cached: Option<i64>,
pub tk_prompt_cache_creation: Option<i64>,
pub tk_completion_total: Option<i64>,
pub tk_completion_reasoning: Option<i64>,
pub cost: Option<f64>,
pub cost_cache_write: Option<f64>,
pub cost_cache_saving: Option<f64>,
pub input_uid: Option<Uuid>,
pub input_short: Option<String>,
pub input_has_display: Option<bool>,
pub output_uid: Option<Uuid>,
pub output_short: Option<String>,
pub output_has_display: Option<bool>,
}
#[derive(Debug, Clone, Fields, SqliteFromRow)]
pub struct TaskForCreate {
pub run_id: Id,
pub idx: i64,
pub label: Option<String>,
#[field(skip)]
pub input_content: Option<TypedContent>,
}
#[derive(Debug, Clone, Fields, SqliteFromRow)]
pub struct TaskForIds {
pub id: Id,
pub uid: Uuid,
pub idx: i32,
}
impl TaskForCreate {
pub fn new(run_id: Id, idx: i64, label: Option<String>, input_content: Option<TypedContent>) -> Self {
Self {
run_id,
idx,
label,
input_content,
}
}
pub fn new_with_input(run_id: Id, idx: i64, label: Option<String>, input: &serde_json::Value) -> Self {
Self::new(run_id, idx, label, Some(TypedContent::from_value(input)))
}
}
#[derive(Debug, Default, Clone, Fields, SqliteFromRow)]
pub struct TaskForUpdate {
pub label: Option<String>,
pub start: Option<EpochUs>,
pub data_start: Option<EpochUs>,
pub data_end: Option<EpochUs>,
pub ai_start: Option<EpochUs>,
pub ai_gen_start: Option<EpochUs>,
pub ai_gen_end: Option<EpochUs>,
pub ai_end: Option<EpochUs>,
pub output_start: Option<EpochUs>,
pub output_end: Option<EpochUs>,
pub end: Option<EpochUs>,
pub end_state: Option<EndState>,
pub end_err_id: Option<Id>,
pub end_skip_reason: Option<String>,
pub prompt_size: Option<i64>,
pub model_ov: Option<String>,
pub model_upstream: Option<String>,
pub pricing_model: Option<String>,
pub pricing_input: Option<f64>,
pub pricing_input_cached: Option<f64>,
pub pricing_output: Option<f64>,
pub tk_prompt_total: Option<i32>,
pub tk_prompt_cached: Option<i32>,
pub tk_prompt_cache_creation: Option<i32>,
pub tk_completion_total: Option<i32>,
pub tk_completion_reasoning: Option<i32>,
pub cost: Option<f64>,
pub cost_cache_write: Option<f64>,
pub cost_cache_saving: Option<f64>,
pub input_uid: Option<Uuid>,
pub input_short: Option<String>,
pub input_has_display: Option<bool>,
pub output_uid: Option<Uuid>,
pub output_short: Option<String>,
pub output_has_display: Option<bool>,
}
impl TaskForUpdate {
pub fn from_usage(usage: &genai::chat::Usage) -> Self {
let tk_prompt_total = usage.prompt_tokens;
let tk_prompt_cached = usage.prompt_tokens_details.as_ref().and_then(|d| d.cached_tokens);
let tk_prompt_cache_creation = usage.prompt_tokens_details.as_ref().and_then(|d| d.cache_creation_tokens);
let tk_completion_total = usage.completion_tokens;
let tk_completion_reasoning = usage.completion_tokens_details.as_ref().and_then(|d| d.reasoning_tokens);
Self {
tk_prompt_total,
tk_prompt_cached,
tk_prompt_cache_creation,
tk_completion_total,
tk_completion_reasoning,
..Default::default()
}
}
}
#[derive(Debug, Default, Clone, Fields, SqliteFromRow)]
pub struct TaskFilter {
pub run_id: Option<Id>,
}
impl Task {
pub fn is_ended(&self) -> bool {
matches!(RunningState::from(self), RunningState::Ended(_))
}
pub fn has_skip(&self) -> bool {
self.end_state == Some(EndState::Skip)
}
pub fn is_ai_running(&self) -> bool {
self.ai_running_state() == RunningState::Running
}
pub fn is_skipped_before_ai(&self) -> bool {
if let Some(EndState::Skip) = self.end_state
&& self.ai_start.is_none()
{
true
} else {
false
}
}
}
impl Task {
#[allow(unused)]
pub fn data_running_state(&self) -> RunningState {
if self.data_end.is_some() {
RunningState::Ended(self.end_state)
} else if self.data_start.is_some() {
RunningState::Running
} else {
match self.end_state {
Some(_) => RunningState::NotScheduled,
None => RunningState::Waiting,
}
}
}
pub fn ai_running_state(&self) -> RunningState {
let ai_stage_done = self.ai_start.is_some() && self.ai_end.is_some();
let task_ended = self.end_state.is_some();
match (task_ended, ai_stage_done, self.ai_gen_start, self.ai_gen_end) {
(true, _, Some(_start), None) => RunningState::Ended(Some(EndState::Cancel)),
(false, _, Some(_start), None) => RunningState::Running,
(_, true, Some(_start), Some(_end)) => RunningState::Ended(Some(EndState::Ok)),
(_, false, Some(_start), Some(_end)) => RunningState::Ended(Some(EndState::Err)),
(_, true, None, None) => RunningState::NotScheduled,
_ => RunningState::Unknown,
}
}
}
impl From<&Task> for RunningState {
fn from(value: &Task) -> Self {
if value.end.is_some() {
RunningState::Ended(value.end_state)
} else if value.start.is_some() {
RunningState::Running
} else {
RunningState::Waiting
}
}
}
pub struct TaskBmc;
impl DbBmc for TaskBmc {
const TABLE: &'static str = "task";
const ENTITY_TYPE: EntityType = EntityType::Task;
}
impl TaskBmc {
pub fn create(mm: &ModelManager, mut task_c: TaskForCreate) -> Result<Id> {
let input_content = task_c.input_content.take();
let run_id = task_c.run_id;
let task_fields = task_c.sqlite_not_none_fields();
let id = base::create_with_rel_ids::<Self>(
mm,
task_fields,
RelIds {
run_id: Some(run_id),
..Default::default()
},
)?;
if let Some(input_content) = input_content {
Self::update_input(mm, id, input_content)?;
}
Ok(id)
}
pub fn update(mm: &ModelManager, id: Id, task_u: TaskForUpdate) -> Result<usize> {
let fields = task_u.sqlite_not_none_fields();
let run_id = Self::get(mm, id)?.run_id;
base::update_with_rel_ids::<Self>(
mm,
id,
fields,
RelIds {
run_id: Some(run_id),
..Default::default()
},
)
}
#[allow(unused)]
pub fn get(mm: &ModelManager, id: Id) -> Result<Task> {
base::get::<Self, _>(mm, id)
}
pub fn list(mm: &ModelManager, list_options: Option<ListOptions>, filter: Option<TaskFilter>) -> Result<Vec<Task>> {
let filter_fields = filter.map(|f| f.sqlite_not_none_fields());
base::list::<Self, _>(mm, list_options, filter_fields)
}
pub fn create_batch(mm: &ModelManager, mut items: Vec<TaskForCreate>) -> Result<Vec<Id>> {
let run_id = items.first().map(|item| item.run_id);
let input_updates: Vec<Option<TaskInputPersistence>> = items
.iter_mut()
.map(|task| task.input_content.take().map(TaskInputPersistence::from_typed_content))
.collect();
let items_fields = items
.into_iter()
.zip(input_updates.iter())
.map(|(task, may_input_update)| {
let mut fields = task.sqlite_not_none_fields();
if let Some(input_update) = may_input_update {
if let Some(input_uid) = input_update.input_uid {
fields.push(SqliteField::new("input_uid", input_uid));
}
if let Some(input_short) = input_update.input_short.as_ref() {
fields.push(SqliteField::new("input_short", input_short.clone()));
}
if let Some(input_has_display) = input_update.input_has_display {
fields.push(SqliteField::new("input_has_display", input_has_display));
}
}
fields
})
.collect();
let ids = base::batch_create_with_rel_ids::<Self>(
mm,
items_fields,
RelIds {
run_id,
..Default::default()
},
)?;
let task_uids: Vec<Uuid> = ids
.iter()
.cloned()
.map(|id| TaskBmc::get_uid(mm, id))
.collect::<Result<Vec<_>>>()?;
let mut inout_to_create: Vec<InoutForCreate> = Vec::new();
for ((_id, task_uid), may_input_update) in ids.iter().cloned().zip(task_uids).zip(input_updates) {
if let Some(input_update) = may_input_update {
let TaskInputPersistence {
inout_uid,
inout_typ,
inout_content,
inout_display,
..
} = input_update;
if let Some(inout_c) = inout_uid.map(|uid| InoutForCreate {
uid,
task_uid,
typ: inout_typ,
content: inout_content,
display: inout_display,
}) {
inout_to_create.push(inout_c);
}
}
}
if !inout_to_create.is_empty() {
InoutBmc::create_batch_with_rel_ids(
mm,
inout_to_create,
RelIds {
run_id,
..Default::default()
},
)?;
}
Ok(ids)
}
}
impl TaskBmc {
pub fn get_ids(mm: &ModelManager, id: Id) -> Result<TaskForIds> {
let ids = base::get::<Self, TaskForIds>(mm, id)?;
Ok(ids)
}
pub fn list_for_run(mm: &ModelManager, run_id: Id) -> Result<Vec<Task>> {
let filter = TaskFilter { run_id: Some(run_id) };
Self::list(mm, None, Some(filter))
}
pub fn set_end_error_no_end(
mm: &ModelManager,
task_id: Id,
stage: Option<Stage>,
error: &crate::error::Error,
) -> Result<()> {
use crate::model::{ContentTyp, ErrBmc, ErrForCreate};
let task = Self::get(mm, task_id)?;
let err_c = ErrForCreate {
stage,
run_id: Some(task.run_id),
task_id: Some(task_id),
typ: Some(ContentTyp::Text),
content: Some(error.to_string()),
};
let err_id = ErrBmc::create(mm, err_c)?;
let task_u = TaskForUpdate {
end_state: Some(EndState::Err),
end_err_id: Some(err_id),
..Default::default()
};
Self::update(mm, task_id, task_u)?;
Ok(())
}
pub fn cancel_all_not_ended_for_run(mm: &ModelManager, run_id: Id) -> Result<usize> {
let tasks_u = TaskForUpdate {
end_state: Some(EndState::Cancel),
end: Some(now_micro().into()), ..Default::default()
};
let table_name = Self::table_ref();
let update_fields = tasks_u.sqlite_not_none_fields();
let sql = format!(
"UPDATE {table_name} SET {} where run_id = ? AND end_state IS NULL",
update_fields.sql_setters()
);
let all_fields = update_fields.append(SqliteField::new("run_id", run_id));
let values = all_fields.values_as_dyn_to_sql_vec();
let db = mm.db();
let num = db.exec(&sql, &*values)?;
if num > 0 {
get_hub().publish_sync(ModelEvent {
entity: EntityType::Task,
action: EntityAction::Updated,
id: None,
rel_ids: RelIds {
run_id: Some(run_id),
..Default::default()
},
});
}
Ok(num)
}
}
impl TaskBmc {
pub fn get_input_for_display(mm: &ModelManager, task: &Task) -> Result<Option<String>> {
let Some(input_uid) = task.input_uid.as_ref() else {
if let Some(input_short) = task.input_short.as_ref() {
return Ok(Some(input_short.to_string()));
} else {
return Ok(None);
}
};
let input_has_display = task.input_has_display.unwrap_or_default();
if input_has_display {
Ok(InoutBmc::get_by_uid::<InoutOnlyDisplay>(mm, *input_uid)
.map(|i| i.display)
.ok()
.flatten())
} else {
Ok(InoutBmc::get_by_uid::<Inout>(mm, *input_uid).map(|i| i.content).ok().flatten())
}
}
pub fn get_output_for_display(mm: &ModelManager, task: &Task) -> Result<Option<String>> {
let Some(output_uid) = task.output_uid.as_ref() else {
if let Some(output_short) = task.output_short.as_ref() {
return Ok(Some(output_short.to_string()));
} else {
return Ok(None);
}
};
let output_has_display = task.output_has_display.unwrap_or_default();
if output_has_display {
Ok(InoutBmc::get_by_uid::<InoutOnlyDisplay>(mm, *output_uid)
.map(|i| i.display)
.ok()
.flatten())
} else {
Ok(InoutBmc::get_by_uid::<Inout>(mm, *output_uid).map(|i| i.content).ok().flatten())
}
}
pub fn update_input(mm: &ModelManager, id: Id, input_content: TypedContent) -> Result<()> {
let task = TaskBmc::get(mm, id)?;
if let (Some(short), has_more) = input_content.extract_short() {
let (input_uid, input_has_display) = if has_more {
(Some(input_content.uid), Some(input_content.display.is_some()))
} else {
(None, None)
};
TaskBmc::update(
mm,
id,
TaskForUpdate {
input_uid,
input_has_display,
input_short: Some(short),
..Default::default()
},
)?;
if has_more {
let task_uid = TaskBmc::get_uid(mm, id)?;
base::create_uid_included_with_rel_ids::<InoutBmc>(
mm,
InoutForCreate {
uid: input_content.uid,
task_uid,
typ: Some(input_content.typ),
content: input_content.content,
display: input_content.display,
}
.sqlite_not_none_fields(),
RelIds {
run_id: Some(task.run_id),
task_id: Some(id),
..Default::default()
},
)?;
}
}
Ok(())
}
pub fn update_output(mm: &ModelManager, id: Id, output_content: TypedContent) -> Result<()> {
let task = TaskBmc::get(mm, id)?;
if let (Some(short), has_more) = output_content.extract_short() {
let (output_uid, output_has_display) = if has_more {
(Some(output_content.uid), Some(output_content.display.is_some()))
} else {
(None, None)
};
TaskBmc::update(
mm,
id,
TaskForUpdate {
output_uid,
output_has_display,
output_short: Some(short),
..Default::default()
},
)?;
if has_more {
let task_uid = TaskBmc::get_uid(mm, id)?;
base::create_uid_included_with_rel_ids::<InoutBmc>(
mm,
InoutForCreate {
uid: output_content.uid,
task_uid,
typ: Some(output_content.typ),
content: output_content.content,
display: output_content.display,
}
.sqlite_not_none_fields(),
RelIds {
run_id: Some(task.run_id),
task_id: Some(id),
..Default::default()
},
)?;
}
}
Ok(())
}
}
#[derive(Debug)]
struct TaskInputPersistence {
input_uid: Option<Uuid>,
input_short: Option<String>,
input_has_display: Option<bool>,
inout_uid: Option<Uuid>,
inout_typ: Option<crate::model::ContentTyp>,
inout_content: Option<String>,
inout_display: Option<String>,
}
impl TaskInputPersistence {
fn from_typed_content(input_content: TypedContent) -> Self {
if let (Some(short), has_more) = input_content.extract_short() {
let (input_uid, input_has_display) = if has_more {
(Some(input_content.uid), Some(input_content.display.is_some()))
} else {
(None, None)
};
Self {
input_uid,
input_short: Some(short),
input_has_display,
inout_uid: has_more.then_some(input_content.uid),
inout_typ: has_more.then_some(input_content.typ),
inout_content: has_more.then_some(input_content.content).flatten(),
inout_display: has_more.then_some(input_content.display).flatten(),
}
} else {
Self {
input_uid: None,
input_short: None,
input_has_display: None,
inout_uid: None,
inout_typ: None,
inout_content: None,
inout_display: None,
}
}
}
}
#[cfg(test)]
mod tests {
type Result<T> = core::result::Result<T, Box<dyn std::error::Error>>;
use super::*;
use crate::hub::{Hub, HubEvent};
use crate::model::{RunBmc, RunForCreate};
use crate::support::time::now_micro;
use modql::filter::OrderBy;
use serde_json::json;
async fn create_run(mm: &ModelManager, label: &str) -> Result<Id> {
let run_c = RunForCreate {
parent_id: None,
agent_name: Some(label.to_string()),
agent_path: Some(format!("path/{label}")),
has_task_stages: None,
has_prompt_parts: None,
};
Ok(RunBmc::create(mm, run_c)?)
}
#[tokio::test]
async fn test_model_task_bmc_create() -> Result<()> {
let mm = ModelManager::new().await?;
let run_id = create_run(&mm, "run-1").await?;
let task_c = TaskForCreate {
run_id,
idx: 1,
label: Some("Test Task".to_string()),
input_content: None,
};
let id = TaskBmc::create(&mm, task_c)?;
assert_eq!(id.as_i64(), 1);
Ok(())
}
#[tokio::test]
async fn test_model_task_bmc_update_simple() -> Result<()> {
let mm = ModelManager::new().await?;
let run_id = create_run(&mm, "run-1").await?;
let task_c = TaskForCreate {
run_id,
idx: 1,
label: Some("Test Task".to_string()),
input_content: None,
};
let id = TaskBmc::create(&mm, task_c)?;
let task_u = TaskForUpdate {
start: Some(now_micro().into()),
..Default::default()
};
TaskBmc::update(&mm, id, task_u)?;
let task = TaskBmc::get(&mm, id)?;
assert!(task.start.is_some());
Ok(())
}
#[tokio::test]
async fn test_model_task_bmc_list_simple() -> Result<()> {
let mm = ModelManager::new().await?;
let run_id = create_run(&mm, "run-1").await?;
for i in 0..3 {
let task_c = TaskForCreate {
run_id,
idx: 1 + 1,
label: Some(format!("label-{i}")),
input_content: None,
};
TaskBmc::create(&mm, task_c)?;
}
let tasks: Vec<Task> = TaskBmc::list(&mm, Some(ListOptions::default()), None)?;
assert_eq!(tasks.len(), 3);
let task = tasks.first().ok_or("Should have first item")?;
assert_eq!(task.id, 1.into());
assert_eq!(task.label, Some("label-0".to_string()));
let task = tasks.get(2).ok_or("Should have 3 items")?;
assert_eq!(task.id, 3.into());
assert_eq!(task.label, Some("label-2".to_string()));
Ok(())
}
#[tokio::test]
async fn test_model_task_bmc_list_from_seed() -> Result<()> {
let mm = ModelManager::new().await?;
let run_id = create_run(&mm, "run-seed").await?;
for i in 0..10 {
let task_c = TaskForCreate {
run_id,
idx: i + 1,
label: Some(format!("label-{i}")),
input_content: None,
};
TaskBmc::create(&mm, task_c)?;
}
let tasks: Vec<Task> = TaskBmc::list(&mm, Some(ListOptions::default()), None)?;
assert_eq!(tasks.len(), 10);
let task = tasks.first().ok_or("Should have first item")?;
assert_eq!(task.id, 1.into());
assert_eq!(task.label, Some("label-0".to_string()));
let task = tasks.get(2).ok_or("Should have 3 items")?;
assert_eq!(task.id, 3.into());
assert_eq!(task.label, Some("label-2".to_string()));
Ok(())
}
#[tokio::test]
async fn test_model_task_bmc_list_order_by() -> Result<()> {
let mm = ModelManager::new().await?;
let run_id = create_run(&mm, "run-1").await?;
for i in 0..3 {
let task_c = TaskForCreate {
run_id,
idx: i + 1,
label: Some(format!("label-{i}")),
input_content: None,
};
TaskBmc::create(&mm, task_c)?;
}
let order_bys = OrderBy::from("!id");
let list_options = ListOptions::from(order_bys);
let tasks: Vec<Task> = TaskBmc::list(&mm, Some(list_options), None)?;
assert_eq!(tasks.len(), 3);
let task = tasks.first().ok_or("Should have first item")?;
assert_eq!(task.id, 3.into());
assert_eq!(task.label, Some("label-2".to_string()));
let task = tasks.get(2).ok_or("Should have third item")?;
assert_eq!(task.id, 1.into());
assert_eq!(task.label, Some("label-0".to_string()));
Ok(())
}
#[tokio::test]
async fn test_model_task_cancel_all_not_ended_for_run() -> Result<()> {
let mm = ModelManager::new().await?;
let run_id = create_run(&mm, "run-1").await?;
for i in 0..3 {
let task_c = TaskForCreate {
run_id,
idx: 1 + 1,
label: Some(format!("label-{i}")),
input_content: None,
};
TaskBmc::create(&mm, task_c)?;
}
TaskBmc::update(
&mm,
1.into(),
TaskForUpdate {
end: Some(now_micro().into()),
end_state: Some(EndState::Ok),
..Default::default()
},
)?;
let count_ends_fn = || -> Result<i32> {
Ok(TaskBmc::list(&mm, None, Some(TaskFilter { run_id: Some(run_id) }))?
.into_iter()
.map(|t| t.end.map(|_| 1).unwrap_or_default())
.sum::<i32>())
};
assert_eq!(count_ends_fn()?, 1);
TaskBmc::cancel_all_not_ended_for_run(&mm, run_id)?;
assert_eq!(count_ends_fn()?, 3); let states: Vec<EndState> = TaskBmc::list(&mm, None, Some(TaskFilter { run_id: Some(run_id) }))?
.into_iter()
.filter_map(|t| t.end_state)
.collect();
assert_eq!(&format!("{states:?}"), "[Ok, Cancel, Cancel]");
Ok(())
}
#[tokio::test]
async fn test_model_task_update_publishes_model_event_with_run_id() -> Result<()> {
let hub = Hub::new();
let rx = hub.take_rx()?;
let mm = ModelManager::new().await?;
let run_id = create_run(&mm, "run-1").await?;
let task_id = TaskBmc::create(
&mm,
TaskForCreate {
run_id,
idx: 1,
label: Some("Test Task".to_string()),
input_content: None,
},
)?;
TaskBmc::update(
&mm,
task_id,
TaskForUpdate {
start: Some(now_micro().into()),
..Default::default()
},
)?;
let event = rx.recv().await?;
match event {
HubEvent::Model(evt) => {
assert_eq!(evt.entity, EntityType::Task);
assert_eq!(evt.action, EntityAction::Updated);
assert_eq!(evt.id, Some(task_id));
assert_eq!(evt.rel_ids.run_id, Some(run_id));
}
_ => return Err("Should receive HubEvent::Data".into()),
}
Ok(())
}
#[tokio::test]
async fn test_model_task_bmc_create_batch_persists_input_fields() -> Result<()> {
let mm = ModelManager::new().await?;
let run_id = create_run(&mm, "run-batch").await?;
let long_input = "x".repeat(80);
let items = vec![
TaskForCreate::new_with_input(run_id, 0, None, &json!("short input")),
TaskForCreate::new_with_input(run_id, 1, None, &json!(long_input)),
];
let ids = TaskBmc::create_batch(&mm, items)?;
assert_eq!(ids.len(), 2);
let task_short = TaskBmc::get(&mm, ids[0])?;
assert_eq!(task_short.input_short, Some("short input".to_string()));
assert!(task_short.input_uid.is_none());
let task_long = TaskBmc::get(&mm, ids[1])?;
assert!(task_long.input_short.is_some());
assert!(task_long.input_uid.is_some());
let stored_input = TaskBmc::get_input_for_display(&mm, &task_long)?;
assert_eq!(stored_input, Some("x".repeat(80)));
Ok(())
}
#[tokio::test]
async fn test_model_task_bmc_create_batch_creates_large_inputs_in_batch() -> Result<()> {
let mm = ModelManager::new().await?;
let run_id = create_run(&mm, "run-batch-large").await?;
let long_input_a = "a".repeat(80);
let long_input_b = "b".repeat(90);
let items = vec![
TaskForCreate::new_with_input(run_id, 0, None, &json!(long_input_a.clone())),
TaskForCreate::new_with_input(run_id, 1, None, &json!(long_input_b.clone())),
];
let ids = TaskBmc::create_batch(&mm, items)?;
assert_eq!(ids.len(), 2);
let task_a = TaskBmc::get(&mm, ids[0])?;
let task_b = TaskBmc::get(&mm, ids[1])?;
let input_uid_a = task_a.input_uid.ok_or("Should have input_uid for first long input")?;
let input_uid_b = task_b.input_uid.ok_or("Should have input_uid for second long input")?;
let inout_a = InoutBmc::get_by_uid::<Inout>(&mm, input_uid_a)?;
let inout_b = InoutBmc::get_by_uid::<Inout>(&mm, input_uid_b)?;
assert_eq!(inout_a.content, Some(long_input_a));
assert_eq!(inout_b.content, Some(long_input_b));
Ok(())
}
}