use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, oneshot, mpsc};
use log::{debug, error};
use crate::types::mini_helpers::from_value;
use crate::core::client::VimClientHandle;
use crate::core::error::{Error, Result};
use crate::mo::{ListView, ViewManager};
use crate::types::structs::ManagedObjectReference;
use crate::types::enums::{TaskInfoStateEnum, MoTypesEnum};
use crate::types::vim_any::VimAny;
use vim_macros::vim_updatable;
use crate::core::pc_cache::{CacheAction, CacheManager, ObjectCache, ObjectCacheListener};
#[derive(Clone)]
pub struct TaskTracker {
client: VimClientHandle,
state: Arc<RwLock<SharedState>>,
}
impl TaskTracker {
pub fn new(client: VimClientHandle) -> Self {
Self {
client,
state: Arc::new(RwLock::new(SharedState {
list_view: None,
list_view_mor: None,
pending_tasks: HashMap::new(),
is_running: false,
shutdown_signal: None,
})),
}
}
pub async fn shutdown(&self) {
let shutdown_tx = {
let mut state = self.state.write().await;
state.shutdown_signal.take()
};
if let Some(tx) = shutdown_tx {
let _ = tx.send(());
}
}
pub async fn wait_any(&self, task: ManagedObjectReference) -> Result<Option<VimAny>> {
let (tx, rx) = oneshot::channel();
let task_id = task.value.clone();
let list_view = {
let mut state = self.state.write().await;
if state.list_view.is_none() {
let view_manager = self.client.service_content().view_manager.as_ref()
.ok_or_else(|| Error::internal("ViewManager not available".to_string()))?;
let vm = ViewManager::new(self.client.clone(), &view_manager.value);
let lv_mor = vm.create_list_view(Some(&[])).await?;
state.list_view = Some(ListView::new(self.client.clone(), &lv_mor.value));
state.list_view_mor = Some(lv_mor);
}
state.pending_tasks.insert(task_id.clone(), tx);
if !state.is_running {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
state.shutdown_signal = Some(shutdown_tx);
state.is_running = true;
let tracker = self.clone();
tokio::spawn(async move {
if let Err(e) = tracker.background_loop(shutdown_rx).await {
error!("TaskTracker background loop failed: {}", e);
let (list_view_to_destroy, pending_to_notify) = {
let mut state = tracker.state.write().await;
state.is_running = false;
state.shutdown_signal = None;
let lv = state.list_view.take();
state.list_view_mor = None;
let pending: Vec<_> = state.pending_tasks.drain().map(|(_, tx)| tx).collect();
(lv, pending)
};
for tx in pending_to_notify {
let _ = tx.send(Err(Error::internal("TaskTracker loop terminated.".to_string())));
}
if let Some(lv) = list_view_to_destroy {
let _ = lv.destroy_view().await;
}
}
});
}
state.list_view.as_ref().unwrap().clone()
};
if let Err(e) = list_view.modify_list_view(Some(&[task.clone()]), Some(&[])).await {
let mut state = self.state.write().await;
state.pending_tasks.remove(&task_id);
return Err(e.into());
}
match rx.await {
Ok(res) => res,
Err(_) => Err(Error::internal("TaskTracker channel closed".to_string())),
}
}
pub async fn wait<T: miniserde::Deserialize + 'static>(&self, task: ManagedObjectReference) -> Result<T> {
let val_opt = self.wait_value(task).await?;
match val_opt {
Some(val) => {
let result: T = from_value(&val)?;
Ok(result)
},
None => {
let result: T = miniserde::json::from_str("null")
.map_err(|_| Error::internal("Failed to deserialize null into target type".to_string()))?;
Ok(result)
}
}
}
async fn wait_value(&self, task: ManagedObjectReference) -> Result<Option<miniserde::json::Value>> {
let any_opt = self.wait_any(task).await?;
match any_opt {
None => Ok(None),
Some(any) => {
let json_str = miniserde::json::to_string(&any);
let val: miniserde::json::Value = miniserde::json::from_str(&json_str)
.map_err(|_| Error::internal("Failed to roundtrip VimAny to json::Value".to_string()))?;
Ok(Some(val))
}
}
}
async fn background_loop(&self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
let (list_view_mor, list_view) = {
let state = self.state.read().await;
let mor = state.list_view_mor.as_ref().ok_or_else(|| Error::internal("ListView MOR is None in background loop".to_string()))?.clone();
let lv = state.list_view.as_ref().unwrap().clone();
(mor, lv)
};
let mut manager = CacheManager::new(self.client.clone())?;
let mut monitor = manager.create_monitor()?;
let (comp_tx, mut comp_rx) = mpsc::unbounded_channel();
let listener = TaskListener { tx: comp_tx };
let obj_spec = obj_spec_for_view(list_view_mor);
let cache = ObjectCache::new_with_listener(Box::new(listener));
manager.add_cache(Box::new(cache), obj_spec).await?;
loop {
let wait_future = monitor.wait_updates(10);
tokio::select! {
_ = &mut shutdown_rx => {
debug!("Shutdown signal received");
break;
}
res = wait_future => {
match res {
Ok(Some(updates)) => {
if let Err(e) = manager.apply_updates(updates) {
error!("Failed to apply updates: {}", e);
}
while let Ok((task_id, result)) = comp_rx.try_recv() {
self.complete_task(&list_view, task_id, result).await;
}
}
Ok(None) => {}
Err(e) => return Err(e),
}
}
}
let (should_exit, list_view_to_destroy) = {
let mut state = self.state.write().await;
if state.pending_tasks.is_empty() {
state.is_running = false;
state.shutdown_signal = None;
let lv = state.list_view.take();
state.list_view_mor = None;
(true, lv)
} else {
(false, None)
}
};
if should_exit {
debug!("No pending tasks, exiting background loop");
manager.destroy().await?;
if let Some(lv) = list_view_to_destroy {
let _ = lv.destroy_view().await;
}
return Ok(());
}
}
debug!("Shutdown signal path: cleaning up");
let (list_view_to_destroy, pending_to_notify) = {
let mut state = self.state.write().await;
state.is_running = false;
state.shutdown_signal = None;
let lv = state.list_view.take();
state.list_view_mor = None;
let pending: Vec<_> = state.pending_tasks.drain().map(|(_, tx)| tx).collect();
(lv, pending)
};
for tx in pending_to_notify {
let _ = tx.send(Err(Error::internal("TaskTracker shutdown requested".to_string())));
}
manager.destroy().await?;
if let Some(lv) = list_view_to_destroy {
let _ = lv.destroy_view().await;
}
Ok(())
}
async fn complete_task(&self, list_view: &ListView, task_id: String, final_result: Result<Option<VimAny>>) {
let tx_opt = {
let mut state = self.state.write().await;
state.pending_tasks.remove(&task_id)
};
if let Some(tx) = tx_opt {
let _ = tx.send(final_result);
}
let task_mor = ManagedObjectReference {
r#type: MoTypesEnum::Task,
value: task_id
};
if let Err(e) = list_view.modify_list_view(Some(&[]), Some(&[task_mor])).await {
error!("Failed to remove completed task from ListView: {}", e);
}
}
}
vim_updatable!(
struct TaskUpdate: Task {
info = "info",
}
);
use crate::core::pc_helpers::{obj_spec_for_view};
struct SharedState {
list_view: Option<ListView>,
list_view_mor: Option<ManagedObjectReference>,
pending_tasks: HashMap<String, oneshot::Sender<Result<Option<VimAny>>>>,
is_running: bool,
shutdown_signal: Option<oneshot::Sender<()>>,
}
struct TaskListener {
tx: mpsc::UnboundedSender<(String, Result<Option<VimAny>>)>,
}
impl ObjectCacheListener<TaskUpdate> for TaskListener {
fn on_new(&mut self, task: &TaskUpdate) -> CacheAction {
self.check_task(task)
}
fn on_update(&mut self, task: &TaskUpdate) -> CacheAction {
self.check_task(task)
}
fn on_remove(&mut self, task: TaskUpdate) {
self.finish_task(task);
}
}
impl TaskListener {
fn check_task(&self, task: &TaskUpdate) -> CacheAction {
match task.info.state {
TaskInfoStateEnum::Success | TaskInfoStateEnum::Error => CacheAction::Evict,
_ => CacheAction::Keep,
}
}
fn finish_task(&mut self, task: TaskUpdate) {
let task_id = task.id.value.clone();
let result: Option<Result<Option<VimAny>>> = match task.info.state {
TaskInfoStateEnum::Success => {
Some(Ok(task.info.result))
}
TaskInfoStateEnum::Error => {
if task.info.cancelled {
Some(Err(Error::task_cancelled()))
} else {
match task.info.error {
None => Some(Err(Error::internal(
"Task failed but no error detail returned".to_string(),
))),
Some(error) => Some(Err(Error::task_failed(error))),
}
}
}
_ => {
if task.info.cancelled {
Some(Err(Error::task_cancelled()))
} else {
error!("Task {} removed from cache in unexpected state {:?}", task_id, task.info.state);
Some(Err(Error::internal(format!(
"Task removed in unexpected state: {:?}", task.info.state
))))
}
}
};
if let Some(r) = result {
let _ = self.tx.send((task_id, r));
}
}
}