use core::fmt;
use core::mem;
use crossbeam_channel::Sender;
use serde::Deserialize;
use serde::Serialize;
use tracing::{debug, trace};
use ibc_relayer_types::{
core::{ics02_client::events::NewBlock, ics24_host::identifier::ChainId},
Height,
};
use crate::chain::tracking::TrackingId;
use crate::event::IbcEventWithHeight;
use crate::util::lock::{LockExt, RwArc};
use crate::util::task::TaskHandle;
use crate::{event::source::EventBatch, object::Object};
use super::{WorkerCmd, WorkerId};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WorkerData {
Client { misbehaviour: bool, refresh: bool },
}
pub struct WorkerHandle {
id: WorkerId,
object: Object,
data: Option<WorkerData>,
tx: RwArc<Option<Sender<WorkerCmd>>>,
task_handles: Vec<TaskHandle>,
}
impl WorkerHandle {
pub fn new(
id: WorkerId,
object: Object,
data: Option<WorkerData>,
tx: Option<Sender<WorkerCmd>>,
task_handles: Vec<TaskHandle>,
) -> Self {
Self {
id,
object,
data,
tx: <RwArc<_>>::new_lock(tx),
task_handles,
}
}
pub fn try_send_command(&self, cmd: WorkerCmd) {
let res = if let Some(tx) = self.tx.acquire_read().as_ref() {
tx.send(cmd)
} else {
Ok(())
};
if res.is_err() {
debug!("dropping sender end for worker {} as the receiver was dropped when the worker task terminated", self.id);
*self.tx.acquire_write() = None;
}
}
pub fn send_events(
&self,
height: Height,
events: Vec<IbcEventWithHeight>,
chain_id: ChainId,
tracking_id: TrackingId,
) {
let batch = EventBatch {
chain_id,
height,
events,
tracking_id,
};
self.try_send_command(WorkerCmd::IbcEvents { batch });
}
pub fn send_new_block(&self, height: Height, new_block: NewBlock) {
self.try_send_command(WorkerCmd::NewBlock { height, new_block });
}
pub fn clear_pending_packets(&self) {
self.try_send_command(WorkerCmd::ClearPendingPackets);
}
pub fn shutdown(&self) {
for task in self.task_handles.iter() {
task.shutdown()
}
}
pub fn shutdown_and_wait(self) {
for task in self.task_handles.iter() {
task.shutdown()
}
}
pub fn is_stopped(&self) -> bool {
for task in self.task_handles.iter() {
if !task.is_stopped() {
return false;
}
}
true
}
pub fn shutdown_stopped_tasks(&self) -> bool {
if self.task_handles.iter().any(|t| t.is_stopped()) {
for task in self.task_handles.iter() {
task.shutdown();
}
return true;
}
false
}
pub fn join(mut self) {
let task_handles = mem::take(&mut self.task_handles);
trace!(worker = %self.object.short_name(), "worker::handle: waiting for worker loop to end");
for task in task_handles.into_iter() {
task.join()
}
trace!(worker = %self.object.short_name(), "worker::handle: waiting for worker loop to end: done");
}
pub fn id(&self) -> WorkerId {
self.id
}
pub fn object(&self) -> &Object {
&self.object
}
pub fn data(&self) -> Option<&WorkerData> {
self.data.as_ref()
}
}
impl Drop for WorkerHandle {
fn drop(&mut self) {
self.shutdown()
}
}
impl fmt::Debug for WorkerHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WorkerHandle")
.field("id", &self.id)
.field("object", &self.object)
.finish_non_exhaustive()
}
}