use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use crate::common::logger::*;
use crate::common::now_millis;
use crate::task::Task;
use crate::topology::TaskMsg;
use tempest_source::prelude::{Msg, MsgId};
static ROOT_NAME: &'static str = "root";
pub(crate) type Edge = (String, String);
pub(crate) type MatrixRow = (&'static str, &'static str, bool);
pub(crate) type Matrix = Vec<MatrixRow>;
#[derive(Default)]
pub struct Pipeline {
pub(crate) root: &'static str,
pub(crate) tasks: HashMap<&'static str, Box<dyn Task>>,
pub(crate) ancestors: HashMap<&'static str, Vec<&'static str>>,
pub(crate) descendants: HashMap<&'static str, Vec<&'static str>>,
pub(crate) matrix: Matrix,
}
impl Pipeline {
pub(crate) fn runtime(&self) -> Self {
Self {
root: self.root.clone(),
tasks: HashMap::new(),
ancestors: self.ancestors.clone(),
descendants: self.descendants.clone(),
matrix: self.matrix.clone(),
}
}
pub(crate) fn names(&self) -> Vec<String> {
self.tasks.keys().map(|k| k.to_string()).collect()
}
pub(crate) fn remove(&mut self, name: &str) -> Option<Box<dyn Task>> {
self.tasks.remove(name)
}
pub(crate) fn get(&self, name: &str) -> Option<&dyn Task> {
self.tasks
.get(name)
.and_then(|boxed| Some(&**boxed as &(dyn Task)))
}
#[allow(dead_code)]
pub(crate) fn get_mut(&mut self, name: &str) -> Option<&mut dyn Task> {
self.tasks
.get_mut(name)
.and_then(|boxed| Some(&mut **boxed as &mut (dyn Task)))
}
pub fn len(&self) -> usize {
self.tasks.len()
}
pub fn root(mut self, name: &'static str) -> Self {
self.root = name;
self
}
pub fn task<T: Task + 'static>(mut self, task: T) -> Self {
let name = task.name().clone();
if name == ROOT_NAME {
panic!("Task.name \"root\" isn't allowed. Use a different name.");
}
if self.get(&name).is_none() {
self.tasks.insert(task.name(), Box::new(task));
}
if &self.root == &"" {
let copy = self.edge(ROOT_NAME, &name);
copy.root(&name)
} else {
self
}
}
fn dfs_check(
&self,
task: &'static str,
mut visited: &mut HashSet<&'static str>,
mut stack: &mut HashSet<&'static str>,
) -> bool {
visited.insert(task.clone());
stack.insert(task.clone());
match self.descendants.get(task) {
Some(descendants) => {
for descendant in descendants {
if !visited.contains(descendant) {
if self.dfs_check(descendant, &mut visited, &mut stack) {
return true;
}
} else if stack.contains(descendant) {
return true;
}
}
}
None => {}
}
stack.remove(task);
false
}
fn cyclical(&self) -> bool {
let mut visited = HashSet::new();
let mut stack = HashSet::new();
for task in self.tasks.keys() {
if !visited.contains(task) {
if self.dfs_check(task, &mut visited, &mut stack) {
error!("Pipeline task is cyclical: {}", &task);
return true;
}
}
}
false
}
pub fn edge(mut self, left: &'static str, right: &'static str) -> Self {
if &left == &right {
panic!(
"Pipeline has the same value for left & right edges: {:?}",
&left
);
}
if left != ROOT_NAME && !self.tasks.contains_key(&left) {
panic!("Pipeline.tasks missing a task with the name {}", &left);
}
if !self.tasks.contains_key(&right) {
panic!("Pipeline.tasks missing a task with the name {}", &right);
}
let matrix_row = (left, right, false);
if self
.matrix
.iter()
.position(|r| r.0 == left && r.1 == right)
.is_none()
{
self.matrix.push(matrix_row);
match self.descendants.get_mut(left) {
Some(v) => v.push(right.clone()),
None => {
self.descendants.insert(left.clone(), vec![right.clone()]);
}
}
match self.ancestors.get_mut(right) {
Some(v) => v.push(left),
None => {
self.ancestors.insert(right, vec![left]);
}
}
}
if self.cyclical() {
panic!("Pipeline contains a cycle: {:?}", &self.matrix);
}
self
}
pub fn build(self) -> Self {
if self.tasks.len() == 0 {
panic!("Pipeline didn't define any tasks.")
}
self
}
pub fn to_graphviz(&self) -> String {
let mut edges = vec![];
for edge in &self.matrix {
edges.push(format!("\t\"{}\" -> \"{}\";", &edge.0, &edge.1));
}
format!(
"digraph G {{
rankdir=LR;
{}
}}",
edges.join("\n")
)
}
}
type MsgStatePendingRow = (usize, bool);
#[derive(Debug, Default)]
pub struct PipelineMsgState {
matrix: Matrix,
task_visited: HashSet<String>,
pending: HashMap<Edge, Vec<MsgStatePendingRow>>,
}
impl PipelineMsgState {
pub fn new(matrix: Matrix) -> Self {
PipelineMsgState {
matrix: matrix,
task_visited: HashSet::new(),
pending: HashMap::new(),
}
}
pub fn get(&self, edge: &Edge) -> Option<&Vec<MsgStatePendingRow>> {
self.pending.get(&edge)
}
pub fn task_visit(&mut self, name: String) {
self.task_visited.insert(name);
}
pub fn edge_start(&mut self, edge: Edge, size: usize) {
let items = (0..size).map(|index| (index, false)).collect();
self.pending.insert(edge, items);
}
pub fn edge_visit(&mut self, edge: &Edge) {
let mut ancestors = 0;
let mut visited = 0;
for mut row in &mut self.matrix {
if row.0 == edge.0 && row.1 == edge.1 {
row.2 = true;
}
if row.1 == edge.1 {
ancestors += 1;
if row.2 {
visited += 1;
}
}
}
if ancestors == visited {
self.task_visit(edge.1.clone());
}
}
pub fn task_dead_ends(&mut self, edge: &Edge, pipeline: &Pipeline) {
self.edge_visit(edge);
match pipeline.descendants.get(&edge.1[..]) {
Some(descendants) => {
for descendant in descendants {
let e = (edge.1.clone(), descendant.clone().to_string());
self.task_dead_ends(&e, &pipeline)
}
}
None => {}
};
}
pub fn edge_visit_index(&mut self, edge: &Edge, index: usize) -> Option<bool> {
let pending = match self.pending.get_mut(edge) {
Some(p) => p,
None => return None,
};
let mut completed = 0;
let total = pending.len();
for pair in pending {
if pair.0 == index {
pair.1 = true;
}
if pair.1 {
completed += 1;
}
}
let next = completed == total;
if next {
self.edge_visit(&edge);
}
Some(next)
}
pub fn finished(&self) -> bool {
for row in &self.matrix {
if row.2 == false {
return false;
}
}
true
}
}
#[derive(Debug)]
pub enum PipelineInflightStatus {
AckEdge(bool),
AckSource,
PendingEdge,
Timeout,
Removed,
}
type MsgInflightState = (usize, PipelineMsgState);
#[derive(Debug, Default)]
pub struct PipelineInflight {
msg_timeout: Option<usize>,
map: HashMap<MsgId, MsgInflightState>,
}
impl PipelineInflight {
pub fn new(msg_timeout: Option<usize>) -> Self {
PipelineInflight {
msg_timeout: msg_timeout,
..PipelineInflight::default()
}
}
pub fn size(&mut self) -> usize {
self.map.len()
}
pub fn get(&self, msg_id: &MsgId) -> Option<&MsgInflightState> {
self.map.get(msg_id)
}
pub fn get_mut(&mut self, msg_id: &MsgId) -> Option<&mut MsgInflightState> {
self.map.get_mut(msg_id)
}
pub fn root(&mut self, msg_id: MsgId, timestamp: usize, state: PipelineMsgState) {
self.map.insert(msg_id, (timestamp, state));
}
pub fn ack_dead_end(&mut self, msg_id: &MsgId, edge: &Edge, pipeline: &Pipeline) {
if let Some((_timestamp, msg_state)) = self.map.get_mut(msg_id) {
msg_state.task_dead_ends(edge, pipeline);
}
}
pub fn ack(&mut self, msg_id: &MsgId, edge: &Edge, index: usize) -> PipelineInflightStatus {
let status = if let Some((timestamp, msg_state)) = self.map.get_mut(msg_id) {
if let Some(ms) = &self.msg_timeout {
let timeout = *timestamp + ms;
if timeout < now_millis() {
return PipelineInflightStatus::Timeout;
}
}
match msg_state.edge_visit_index(edge, index) {
Some(edge_completed) => {
if edge_completed {
if msg_state.finished() {
PipelineInflightStatus::AckSource
} else {
PipelineInflightStatus::AckEdge(
msg_state.task_visited.contains(&edge.1),
)
}
} else {
PipelineInflightStatus::PendingEdge
}
}
None => {
PipelineInflightStatus::Removed
}
}
} else {
PipelineInflightStatus::Removed
};
status
}
pub fn clean_msg_id(&mut self, msg_id: &MsgId) {
self.map.remove(msg_id);
}
pub fn finished(&self, msg_id: &MsgId) -> bool {
if let Some((_ts, msg_state)) = self.get(&msg_id) {
msg_state.finished()
} else {
true
}
}
}
#[derive(Debug, Default)]
pub struct PipelineAvailable {
pub(crate) queue: HashMap<String, VecDeque<TaskMsg>>,
}
impl PipelineAvailable {
pub fn new(task_names: Vec<String>) -> Self {
let mut this = Self::default();
for name in task_names {
this.queue.insert(name, VecDeque::new());
}
this
}
pub fn len(&mut self, name: &String) -> usize {
return self.queue.get(name).map_or(0usize, |q| q.len());
}
pub(crate) fn pop(&mut self, name: &String, count: Option<usize>) -> Option<Vec<TaskMsg>> {
let count = match count {
Some(i) => i,
None => 1 as usize,
};
return self.queue.get_mut(name).map_or(None, |q| {
let mut tasks = vec![];
while let Some(msg) = q.pop_front() {
&tasks.push(msg);
if tasks.len() >= count {
break;
}
}
Some(tasks)
});
}
pub(crate) fn push(&mut self, name: &String, msg: TaskMsg) -> Option<usize> {
return self.queue.get_mut(name).map_or(None, |q| {
q.push_back(msg);
Some(q.len())
});
}
pub fn stats(&mut self) -> HashMap<String, isize> {
let mut stats = HashMap::new();
let _ = self.queue.iter().map(|(k, q)| {
stats.insert(k.to_string(), q.len() as isize);
});
stats
}
}
#[derive(Default, Debug)]
pub struct PipelineAggregate {
holding: HashMap<String, HashMap<MsgId, Vec<Msg>>>,
}
impl PipelineAggregate {
pub fn new(task_names: Vec<String>) -> Self {
let mut this = Self::default();
for name in task_names {
this.holding.insert(name, HashMap::new());
}
this
}
pub fn hold(&mut self, name: &String, msg_id: MsgId, mut msgs: Vec<Msg>) -> Option<bool> {
return self.holding.get_mut(name).map_or(None, |map| {
if !map.contains_key(&msg_id) {
map.insert(msg_id, msgs);
} else {
map.get_mut(&msg_id).unwrap().append(&mut msgs);
}
Some(true)
});
}
pub fn remove(&mut self, name: &String, msg_id: &MsgId) -> Option<Vec<Msg>> {
return self
.holding
.get_mut(name)
.map_or(None, |map| map.remove(msg_id));
}
pub fn clean_msg_id(&mut self, msg_id: &MsgId) {
for (_k, map) in self.holding.iter_mut() {
map.remove(msg_id);
}
}
pub fn stats(&mut self) -> HashMap<String, isize> {
let mut stats = HashMap::new();
let _ = self.holding.iter().map(|(k, hold)| {
stats.insert(k.to_string(), hold.len() as isize);
});
stats
}
}
#[cfg(test)]
mod tests {
use actix::*;
use tempest_source::prelude::SourceMsg;
use crate::common::now_millis;
use crate::metric::Metrics;
use crate::pipeline::*;
use crate::task;
use crate::topology::{PipelineActor, TaskResponse};
static T1: &'static str = "T1";
static T2: &'static str = "T2";
static T3: &'static str = "T3";
static T4: &'static str = "T4";
static T5: &'static str = "T5";
static T6: &'static str = "T6";
static T7: &'static str = "T7";
fn get_source_msg(id: u8) -> SourceMsg {
SourceMsg {
id: vec![0, 0, id.clone()],
msg: vec![id.clone()],
ts: now_millis(),
delivered: 0,
}
}
fn get_pipeline() -> Pipeline {
Pipeline::default()
.task(T1::default())
.task(T2::default())
.task(T3::default())
.task(T4::default())
.task(T5::default())
.task(T6::default())
.task(T7::default())
.edge(T1, T2)
.edge(T1, T3)
.edge(T2, T4)
.edge(T3, T4)
.edge(T4, T5)
.edge(T4, T6)
.edge(T5, T6)
.edge(T5, T7)
.build()
}
#[derive(Default, Debug)]
pub struct T1 {}
impl task::Task for T1 {
fn name(&self) -> &'static str {
T1
}
}
#[derive(Default, Debug)]
pub struct T2 {}
impl task::Task for T2 {
fn name(&self) -> &'static str {
T2
}
}
#[derive(Default, Debug)]
pub struct T3 {}
impl task::Task for T3 {
fn name(&self) -> &'static str {
T3
}
}
#[derive(Default, Debug)]
pub struct T4 {}
impl task::Task for T4 {
fn name(&self) -> &'static str {
T4
}
}
#[derive(Default, Debug)]
pub struct T5 {}
impl task::Task for T5 {
fn name(&self) -> &'static str {
T5
}
}
#[derive(Default, Debug)]
pub struct T6 {}
impl task::Task for T6 {
fn name(&self) -> &'static str {
T6
}
}
#[derive(Default, Debug)]
pub struct T7 {}
impl task::Task for T7 {
fn name(&self) -> &'static str {
T7
}
}
#[derive(Default, Debug)]
pub struct TaskRoot {}
impl task::Task for TaskRoot {
fn name(&self) -> &'static str {
"root"
}
}
#[test]
#[should_panic]
fn test_pipeline_cycle() {
let _ = Pipeline::default()
.task(T1::default())
.task(T2::default())
.task(T3::default())
.edge(T1, T2)
.edge(T2, T3)
.edge(T3, T1)
.build();
}
#[test]
#[should_panic]
fn test_empty_pipeline() {
let _ = Pipeline::default().build();
}
#[test]
#[should_panic]
fn test_pipeline_task_root() {
let _ = Pipeline::default().task(TaskRoot::default()).build();
}
#[test]
#[should_panic]
fn test_pipeline_same_edge() {
let _ = Pipeline::default()
.task(T1::default())
.task(T2::default())
.edge(T1, T1)
.build();
}
#[test]
#[should_panic]
fn test_pipeline_undefined_task_edge() {
let _ = Pipeline::default()
.task(T1::default())
.task(T2::default())
.edge("Y1", "Y2")
.build();
}
#[test]
fn test_pipeline() {
let _ = System::new("Task");
let pipeline = get_pipeline();
let mut pipeline_act = PipelineActor {
pipeline: pipeline.runtime(),
inflight: PipelineInflight::new(None),
available: PipelineAvailable::new(pipeline.names()),
aggregate: PipelineAggregate::new(pipeline.names()),
metrics: Metrics::default().named(vec!["pipeline"]),
};
let src_msg = get_source_msg(0);
let src_msg_copy = src_msg.clone();
pipeline_act.task_root(src_msg_copy);
let result = pipeline_act.inflight.get(&vec![0, 0, 0]);
assert_eq!(result.is_some(), true);
if let Some((_ts, msg_state)) = result {
println!("{:?}", msg_state);
let pending = msg_state.get(&("root".to_owned(), T1.to_owned()));
assert_eq!(pending.is_some(), true);
assert_eq!(pending.unwrap().len(), 1);
};
assert_eq!(pipeline_act.available.len(&T1.to_owned()), 1usize);
let task_resp = TaskResponse::Ack(
src_msg.id.clone(),
("root".to_string(), T1.to_string()),
0,
Some(vec![vec![1], vec![2]]),
);
pipeline_act.task_ack(task_resp);
for index in 0..2 {
let task_resp = TaskResponse::Ack(
src_msg.id.clone(),
(T1.to_string(), T2.to_string()),
index,
Some(vec![vec![1], vec![2]]),
);
pipeline_act.task_ack(task_resp);
}
for index in 0..2 {
let task_resp = TaskResponse::Ack(
src_msg.id.clone(),
(T1.to_string(), T3.to_string()),
index,
Some(vec![vec![1], vec![2], vec![3]]),
);
pipeline_act.task_ack(task_resp);
}
for index in 0..4 {
let task_resp = TaskResponse::Ack(
src_msg.id.clone(),
(T2.to_string(), T4.to_string()),
index,
None,
);
pipeline_act.task_ack(task_resp);
}
for index in 0..4 {
let task_resp = TaskResponse::Ack(
src_msg.id.clone(),
(T2.to_string(), T4.to_string()),
index,
None,
);
pipeline_act.task_ack(task_resp);
}
for index in 0..6 {
let task_resp = TaskResponse::Ack(
src_msg.id.clone(),
(T3.to_string(), T4.to_string()),
index,
None,
);
pipeline_act.task_ack(task_resp);
}
}
}