use std::sync::Arc;
use crate::config;
use crate::runtime::{Runtime, StreamWriter};
use crate::runnable::RunnableError;
use super::PregelExecutableTask;
pub struct PregelRunner {
runtime: Option<Arc<Runtime>>,
stream_writer: Option<StreamWriter>,
}
impl PregelRunner {
pub fn new(runtime: Option<Arc<Runtime>>) -> Self {
Self { runtime, stream_writer: None }
}
pub fn with_stream_writer(mut self, writer: StreamWriter) -> Self {
self.stream_writer = Some(writer);
self
}
pub async fn run_tasks(&self, tasks: &mut [PregelExecutableTask]) -> Result<(), RunnerError> {
if tasks.is_empty() {
return Ok(());
}
if tasks.len() == 1 {
let task = &mut tasks[0];
Self::execute_single_task(task, self.runtime.as_ref(), self.stream_writer.clone()).await?;
return Ok(());
}
for task in tasks.iter_mut() {
Self::execute_single_task(task, self.runtime.as_ref(), self.stream_writer.clone()).await?;
}
Ok(())
}
pub fn run_tasks_sync(&self, tasks: &mut [PregelExecutableTask]) -> Result<(), RunnerError> {
for task in tasks.iter_mut() {
Self::execute_single_task_sync(task, self.runtime.as_ref())?;
}
Ok(())
}
async fn execute_single_task(
task: &mut PregelExecutableTask,
runtime: Option<&Arc<Runtime>>,
stream_writer: Option<StreamWriter>,
) -> Result<(), RunnerError> {
let mut config = task.config.clone();
{
let configurable = config
.entry("configurable".to_string())
.or_insert_with(|| serde_json::json!({}));
if let Some(obj) = configurable.as_object_mut() {
obj.insert(
crate::constants::CONFIG_KEY_SEND.to_string(),
serde_json::json!(true),
);
}
}
let effective_runtime = if let Some(rt) = runtime {
if stream_writer.is_some() {
let mut new_rt = (**rt).clone();
new_rt.stream_writer = stream_writer;
Some(Arc::new(new_rt))
} else {
Some(rt.clone())
}
} else if stream_writer.is_some() {
Some(Arc::new(Runtime {
context: (),
store: None,
stream_writer,
previous: None,
execution_info: None,
server_info: None,
}))
} else {
None
};
let result = if let Some(ref rt) = effective_runtime {
config::with_runtime(config.clone(), rt.clone(), async {
task.proc.ainvoke(&task.input, &config).await
})
.await
} else {
task.proc.ainvoke(&task.input, &config).await
};
match result {
Ok(output) => {
if let Some(obj) = output.as_object() {
for (key, val) in obj {
task.writes.push((key.clone(), val.clone()));
}
}
}
Err(RunnableError::Interrupt(interrupt)) => {
return Err(RunnerError::Interrupt {
task_id: task.id.clone(),
interrupt,
});
}
Err(e) => {
return Err(RunnerError::TaskFailed(task.name.clone(), e.to_string()));
}
}
Ok(())
}
fn execute_single_task_sync(
task: &mut PregelExecutableTask,
runtime: Option<&Arc<Runtime>>,
) -> Result<(), RunnerError> {
let mut config = task.config.clone();
{
let configurable = config
.entry("configurable".to_string())
.or_insert_with(|| serde_json::json!({}));
if let Some(obj) = configurable.as_object_mut() {
obj.insert(
crate::constants::CONFIG_KEY_SEND.to_string(),
serde_json::json!(true),
);
}
}
let result = if let Some(rt) = runtime {
config::with_runtime_sync(config.clone(), rt.clone(), || {
task.proc.invoke(&task.input, &config)
})
} else {
task.proc.invoke(&task.input, &config)
};
match result {
Ok(output) => {
if let Some(obj) = output.as_object() {
for (key, val) in obj {
task.writes.push((key.clone(), val.clone()));
}
}
}
Err(RunnableError::Interrupt(interrupt)) => {
return Err(RunnerError::Interrupt {
task_id: task.id.clone(),
interrupt,
});
}
Err(e) => {
return Err(RunnerError::TaskFailed(task.name.clone(), e.to_string()));
}
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum RunnerError {
#[error("task '{0}' failed: {1}")]
TaskFailed(String, String),
#[error("graph interrupt")]
Interrupt {
task_id: String,
interrupt: crate::types::GraphInterrupt,
},
}