1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
use core::fmt;
use core::mem;

use crossbeam_channel::Sender;
use serde::Deserialize;
use serde::Serialize;
use tracing::{debug, trace};

use ibc_relayer_types::{
    core::{ics02_client::events::NewBlock, ics24_host::identifier::ChainId},
    Height,
};

use crate::chain::tracking::TrackingId;
use crate::event::IbcEventWithHeight;
use crate::util::lock::{LockExt, RwArc};
use crate::util::task::TaskHandle;
use crate::{event::source::EventBatch, object::Object};

use super::{WorkerCmd, WorkerId};

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WorkerData {
    Client { misbehaviour: bool, refresh: bool },
}

pub struct WorkerHandle {
    id: WorkerId,
    object: Object,
    data: Option<WorkerData>,
    tx: RwArc<Option<Sender<WorkerCmd>>>,
    task_handles: Vec<TaskHandle>,
}

impl WorkerHandle {
    pub fn new(
        id: WorkerId,
        object: Object,
        data: Option<WorkerData>,
        tx: Option<Sender<WorkerCmd>>,
        task_handles: Vec<TaskHandle>,
    ) -> Self {
        Self {
            id,
            object,
            data,
            tx: <RwArc<_>>::new_lock(tx),
            task_handles,
        }
    }

    pub fn try_send_command(&self, cmd: WorkerCmd) {
        let res = if let Some(tx) = self.tx.acquire_read().as_ref() {
            tx.send(cmd)
        } else {
            Ok(())
        };

        if res.is_err() {
            debug!("dropping sender end for worker {} as the receiver was dropped when the worker task terminated", self.id);
            *self.tx.acquire_write() = None;
        }
    }

    /// Send a batch of events to the worker.
    pub fn send_events(
        &self,
        height: Height,
        events: Vec<IbcEventWithHeight>,
        chain_id: ChainId,
        tracking_id: TrackingId,
    ) {
        let batch = EventBatch {
            chain_id,
            height,
            events,
            tracking_id,
        };

        self.try_send_command(WorkerCmd::IbcEvents { batch });
    }

    /// Send a batch of [`NewBlock`] event to the worker.
    pub fn send_new_block(&self, height: Height, new_block: NewBlock) {
        self.try_send_command(WorkerCmd::NewBlock { height, new_block });
    }

    /// Instruct the worker to clear pending packets.
    pub fn clear_pending_packets(&self) {
        self.try_send_command(WorkerCmd::ClearPendingPackets);
    }

    /// Shutdown all worker tasks without waiting for them to terminate.
    pub fn shutdown(&self) {
        for task in self.task_handles.iter() {
            task.shutdown()
        }
    }

    /// Shutdown all worker tasks and wait for them to terminate
    pub fn shutdown_and_wait(self) {
        for task in self.task_handles.iter() {
            // Send shutdown signal to all tasks in parallel.
            task.shutdown()
        }
        // Drop handle automatically handles the waiting for tasks to terminate.
    }

    pub fn is_stopped(&self) -> bool {
        for task in self.task_handles.iter() {
            if !task.is_stopped() {
                return false;
            }
        }
        true
    }

    /// Verify if at least one task of the WorkerHandle is stopped.
    /// If it is the case, shutdown all remaining tasks.
    pub fn shutdown_stopped_tasks(&self) -> bool {
        if self.task_handles.iter().any(|t| t.is_stopped()) {
            for task in self.task_handles.iter() {
                task.shutdown();
            }
            return true;
        }
        false
    }

    /// Wait for the worker thread to finish.
    pub fn join(mut self) {
        let task_handles = mem::take(&mut self.task_handles);
        trace!(worker = %self.object.short_name(), "worker::handle: waiting for worker loop to end");
        for task in task_handles.into_iter() {
            task.join()
        }
        trace!(worker = %self.object.short_name(), "worker::handle: waiting for worker loop to end: done");
    }

    /// Get the worker's id.
    pub fn id(&self) -> WorkerId {
        self.id
    }

    /// Get a reference to the worker's object.
    pub fn object(&self) -> &Object {
        &self.object
    }

    /// Get a reference to the worker handle's data.
    pub fn data(&self) -> Option<&WorkerData> {
        self.data.as_ref()
    }
}

// Drop handle to send shutdown signals to background tasks in parallel
// before waiting for all of them to terminate.
impl Drop for WorkerHandle {
    fn drop(&mut self) {
        self.shutdown()
    }
}

impl fmt::Debug for WorkerHandle {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("WorkerHandle")
            .field("id", &self.id)
            .field("object", &self.object)
            .finish_non_exhaustive()
    }
}