use anyhow::Result;
use std::sync::Arc;
use tokio::sync::RwLock;
use brainwires_agents::task_manager::TaskManager;
use brainwires_core::{Task, TaskPriority, TaskStatus};
use crate::LanceDatabase;
use crate::stores::task_store::TaskStore;
pub struct PersistentTaskManager {
manager: TaskManager,
store: TaskStore,
conversation_id: String,
plan_id: Option<String>,
}
impl PersistentTaskManager {
pub async fn new(db: Arc<LanceDatabase>, conversation_id: String) -> Result<Self> {
let store = TaskStore::new(db);
let manager = TaskManager::new();
let tasks = store
.get_by_conversation(&conversation_id)
.await
.unwrap_or_default();
if !tasks.is_empty() {
manager.load_tasks(tasks).await;
}
Ok(Self {
manager,
store,
conversation_id,
plan_id: None,
})
}
pub async fn new_for_plan(
db: Arc<LanceDatabase>,
conversation_id: String,
plan_id: String,
) -> Result<Self> {
let store = TaskStore::new(db);
let manager = TaskManager::new();
let tasks = store.get_by_plan(&plan_id).await.unwrap_or_default();
if !tasks.is_empty() {
manager.load_tasks(tasks).await;
}
Ok(Self {
manager,
store,
conversation_id,
plan_id: Some(plan_id),
})
}
pub fn set_plan_id(&mut self, plan_id: Option<String>) {
self.plan_id = plan_id.clone();
}
pub fn plan_id(&self) -> Option<&String> {
self.plan_id.as_ref()
}
pub async fn create_task(
&self,
description: String,
parent_id: Option<String>,
priority: TaskPriority,
) -> Result<String> {
let task_id = self
.manager
.create_task(description, parent_id, priority)
.await?;
if let Some(mut task) = self.manager.get_task(&task_id).await {
task.plan_id = self.plan_id.clone();
self.store.save(&task, &self.conversation_id).await?;
}
Ok(task_id)
}
pub async fn add_subtask(&self, parent_id: String, description: String) -> Result<String> {
let task_id = self.manager.add_subtask(parent_id, description).await?;
if let Some(mut task) = self.manager.get_task(&task_id).await {
task.plan_id = self.plan_id.clone();
self.store.save(&task, &self.conversation_id).await?;
}
Ok(task_id)
}
pub async fn start_task(&self, task_id: &str) -> Result<()> {
self.manager.start_task(task_id).await?;
if let Some(task) = self.manager.get_task(task_id).await {
self.store.save(&task, &self.conversation_id).await?;
}
Ok(())
}
pub async fn complete_task(&self, task_id: &str, summary: String) -> Result<()> {
self.manager.complete_task(task_id, summary).await?;
if let Some(task) = self.manager.get_task(task_id).await {
self.store.save(&task, &self.conversation_id).await?;
}
let all_tasks = self.manager.get_all_tasks().await;
for task in all_tasks {
if task.status == TaskStatus::Completed {
self.store.save(&task, &self.conversation_id).await?;
}
}
Ok(())
}
pub async fn fail_task(&self, task_id: &str, error: String) -> Result<()> {
self.manager.fail_task(task_id, error).await?;
if let Some(task) = self.manager.get_task(task_id).await {
self.store.save(&task, &self.conversation_id).await?;
}
Ok(())
}
pub async fn add_dependency(&self, task_id: &str, depends_on: &str) -> Result<()> {
self.manager.add_dependency(task_id, depends_on).await?;
if let Some(task) = self.manager.get_task(task_id).await {
self.store.save(&task, &self.conversation_id).await?;
}
Ok(())
}
pub async fn clear(&self) -> Result<()> {
self.manager.clear().await;
if let Some(ref plan_id) = self.plan_id {
self.store.delete_by_plan(plan_id).await?;
} else {
self.store
.delete_by_conversation(&self.conversation_id)
.await?;
}
Ok(())
}
pub async fn persist_all(&self) -> Result<()> {
let tasks = self.manager.get_all_tasks().await;
for task in tasks {
self.store.save(&task, &self.conversation_id).await?;
}
Ok(())
}
pub async fn reload(&self) -> Result<()> {
let tasks = if let Some(ref plan_id) = self.plan_id {
self.store.get_by_plan(plan_id).await?
} else {
self.store
.get_by_conversation(&self.conversation_id)
.await?
};
self.manager.load_tasks(tasks).await;
Ok(())
}
pub async fn get_task(&self, task_id: &str) -> Option<Task> {
self.manager.get_task(task_id).await
}
pub async fn get_ready_tasks(&self) -> Vec<Task> {
self.manager.get_ready_tasks().await
}
pub async fn get_root_tasks(&self) -> Vec<Task> {
self.manager.get_root_tasks().await
}
pub async fn get_task_tree(&self, root_id: Option<&str>) -> Vec<Task> {
self.manager.get_task_tree(root_id).await
}
pub async fn get_all_tasks(&self) -> Vec<Task> {
self.manager.get_all_tasks().await
}
pub async fn get_tasks_by_status(&self, status: TaskStatus) -> Vec<Task> {
self.manager.get_tasks_by_status(status).await
}
pub async fn count(&self) -> usize {
self.manager.count().await
}
pub async fn get_stats(&self) -> brainwires_agents::task_manager::TaskStats {
self.manager.get_stats().await
}
pub async fn get_progress(&self, task_id: &str) -> f64 {
self.manager.get_progress(task_id).await
}
pub async fn get_overall_progress(&self) -> f64 {
self.manager.get_overall_progress().await
}
pub async fn format_tree(&self) -> String {
self.manager.format_tree().await
}
}
pub type SharedPersistentTaskManager = Arc<RwLock<PersistentTaskManager>>;
#[cfg(test)]
mod tests {
}