use std::convert::TryFrom;
use std::ops::Deref;
use bytes::{Buf, BufMut, BytesMut};
use crate::core::checkpoint::CheckpointHandle;
use crate::core::element::Serde;
use crate::core::function::InputSplit;
use crate::core::properties::Properties;
use crate::metrics::Tag;
#[derive(
Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, Default, Ord, PartialOrd,
)]
pub struct OperatorId(pub u32);
#[derive(
Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, Default, Ord, PartialOrd,
)]
pub struct JobId(pub u32);
impl From<OperatorId> for JobId {
fn from(operator_id: OperatorId) -> Self {
JobId(operator_id.0)
}
}
impl Deref for JobId {
type Target = u32;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, Default)]
pub struct TaskId {
pub(crate) job_id: JobId,
pub(crate) task_number: u16,
pub(crate) num_tasks: u16,
}
impl TaskId {
pub fn job_id(&self) -> JobId {
self.job_id
}
pub fn task_number(&self) -> u16 {
self.task_number
}
pub fn num_tasks(&self) -> u16 {
self.num_tasks
}
pub fn is_default(&self) -> bool {
self.job_id.0 == 0 && self.task_number == 0 && self.num_tasks == 0
}
pub fn to_tags(&self) -> Vec<Tag> {
vec![
Tag::new("job_id", self.job_id.0),
Tag::new("task_number", self.task_number),
]
}
}
impl Serde for TaskId {
fn capacity(&self) -> usize {
4 + 2 + 2
}
fn serialize(&self, bytes: &mut BytesMut) {
bytes.put_u32(self.job_id.0);
bytes.put_u16(self.task_number);
bytes.put_u16(self.num_tasks);
}
fn deserialize(bytes: &mut BytesMut) -> Self {
let job_id = bytes.get_u32();
let task_number = bytes.get_u16();
let num_tasks = bytes.get_u16();
TaskId {
job_id: JobId(job_id),
task_number,
num_tasks,
}
}
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Default)]
pub(crate) struct ChannelKey {
pub(crate) source_task_id: TaskId,
pub(crate) target_task_id: TaskId,
}
impl ChannelKey {
pub fn to_tags(&self) -> Vec<Tag> {
vec![
Tag::new("source_job_id", self.source_task_id.job_id.0),
Tag::new("source_task_number", self.source_task_id.task_number),
Tag::new("target_job_id", self.target_task_id.job_id.0),
Tag::new("target_task_number", self.target_task_id.task_number),
]
}
}
impl Serde for ChannelKey {
fn capacity(&self) -> usize {
8 + 8
}
fn serialize(&self, bytes: &mut BytesMut) {
self.source_task_id.serialize(bytes);
self.target_task_id.serialize(bytes);
}
fn deserialize(bytes: &mut BytesMut) -> Self {
let source_task_id = TaskId::deserialize(bytes);
let target_task_id = TaskId::deserialize(bytes);
ChannelKey {
source_task_id,
target_task_id,
}
}
}
#[derive(
Copy, Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Default,
)]
pub struct CheckpointId(pub u64);
impl CheckpointId {
#[inline]
pub fn is_default(&self) -> bool {
self.0 == 0
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct OperatorDescriptor {
pub operator_id: OperatorId,
pub checkpoint_id: CheckpointId,
pub completed_checkpoint_id: Option<CheckpointId>,
pub checkpoint_handle: Option<CheckpointHandle>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct TaskDescriptor {
pub task_id: TaskId,
pub operators: Vec<OperatorDescriptor>,
pub input_split: InputSplit,
pub daemon: bool,
pub thread_id: String,
pub terminated: bool,
}
#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq)]
pub enum ManagerStatus {
Pending = 0,
Registered = 1,
Migration = 2,
Terminating = 3,
Terminated = 4,
}
impl ManagerStatus {
pub fn is_terminating(&self) -> bool {
match self {
ManagerStatus::Terminating => true,
_ => false,
}
}
pub fn is_terminated(&self) -> bool {
match self {
ManagerStatus::Terminated => true,
_ => false,
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum HeartBeatStatus {
Ok,
Panic,
End,
}
impl std::fmt::Display for HeartBeatStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HeartBeatStatus::Ok => write!(f, "ok"),
HeartBeatStatus::Panic => write!(f, "panic"),
HeartBeatStatus::End => write!(f, "end"),
}
}
}
impl<'a> TryFrom<&'a str> for HeartBeatStatus {
type Error = anyhow::Error;
fn try_from(value: &'a str) -> Result<Self, Self::Error> {
match value {
"ok" => Ok(HeartBeatStatus::Ok),
"panic" => Ok(HeartBeatStatus::Panic),
"end" => Ok(HeartBeatStatus::End),
_ => Err(anyhow!("unrecognized status: {}", value)),
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerManagerDescriptor {
pub status: ManagerStatus,
pub latest_heart_beat_ts: u64,
pub latest_heart_beat_status: HeartBeatStatus,
pub task_manager_id: String,
pub task_manager_address: String,
pub metrics_address: String,
pub web_address: String,
pub task_descriptors: Vec<TaskDescriptor>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct CoordinatorManagerDescriptor {
pub version: String,
pub application_id: String,
pub application_properties: Properties,
pub web_address: String,
pub metrics_address: String,
pub status: ManagerStatus,
pub v_cores: u32,
pub memory_mb: u32,
pub num_task_managers: u32,
pub uptime: u64,
pub startup_number: u64,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct ClusterDescriptor {
pub coordinator_manager: CoordinatorManagerDescriptor,
pub worker_managers: Vec<WorkerManagerDescriptor>,
}
impl ClusterDescriptor {
pub fn get_worker_manager(&self, task_id: &TaskId) -> Option<&WorkerManagerDescriptor> {
self.worker_managers
.iter()
.find(|worker_manager_descriptor| {
worker_manager_descriptor
.task_descriptors
.iter()
.find(|task_descriptor| task_descriptor.task_id.eq(task_id))
.is_some()
})
}
pub fn flush_coordinator_status(&mut self) {
let mut task_count = 0;
let mut terminated_count = 0;
for worker_manager in &self.worker_managers {
for task in &worker_manager.task_descriptors {
task_count += 1;
if task.terminated {
terminated_count += 1;
}
}
}
if task_count == terminated_count {
self.coordinator_manager.status = ManagerStatus::Terminated;
} else if terminated_count > 0 {
self.coordinator_manager.status = ManagerStatus::Terminating;
}
}
pub fn to_string(&self) -> String {
serde_json::to_string(self).unwrap()
}
}