use actix::prelude::*;
use serde_derive::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::time::Duration;
use crate::common::logger::*;
use crate::common::now_millis;
use crate::metric::{self, Metrics};
use crate::pipeline::*;
use crate::service::server::TopologyServer;
use crate::source::*;
static TARGET_SOURCE_ACTOR: &'static str = "tempest::topology::SourceActor";
static TARGET_TOPOLOGY_ACTOR: &'static str = "tempest::topology::TopologyActor";
static TARGET_PIPELINE_ACTOR: &'static str = "tempest::topology::PipelineActor";
pub trait Topology<SB: SourceBuilder> {
fn service() {}
fn builder() -> TopologyBuilder<SB>;
fn test_builder() -> TopologyBuilder<SB> {
Self::builder()
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum TopologyFailurePolicy {
None,
BestEffort,
Retry(usize),
}
impl Default for TopologyFailurePolicy {
fn default() -> Self {
TopologyFailurePolicy::BestEffort
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct TopologyOptions {
pub name: String,
host_port: Option<String>,
agent_host_port: Option<String>,
failure_policy: Option<TopologyFailurePolicy>,
msg_timeout: Option<usize>,
pub metric_flush_interval: Option<u64>,
pub metric_targets: Vec<metric::MetricTarget>,
pub graceful_shutdown: Option<u64>,
}
impl TopologyOptions {
pub fn name(&mut self, name: &'static str) {
self.name = name.to_string();
}
pub fn failure_policy(&mut self, policy: TopologyFailurePolicy) {
self.failure_policy = Some(policy);
}
pub fn msg_timeout(&mut self, ms: usize) {
self.msg_timeout = Some(ms);
}
pub fn host_port(&mut self, host_port: String) {
self.host_port = Some(host_port);
}
pub fn agent_host_port(&mut self, host_port: String) {
self.agent_host_port = Some(host_port);
}
pub fn graceful_shutdown(&mut self, ms: u64) {
self.graceful_shutdown = Some(ms);
}
pub fn metric_flush_interval(&mut self, ms: u64) {
self.metric_flush_interval = Some(ms);
}
pub fn metric_target(&mut self, target: metric::MetricTarget) {
self.metric_targets.push(target);
}
}
#[derive(Default)]
pub struct TopologyBuilder<SB: SourceBuilder> {
pub options: TopologyOptions,
pub pipeline: Pipeline,
pub source_builder: SB,
}
impl<SB> TopologyBuilder<SB>
where
SB: SourceBuilder + Default,
<SB as SourceBuilder>::Source: Source + 'static + Default,
{
pub fn name(mut self, name: &'static str) -> Self {
self.options.name(name);
self
}
pub fn failure_policy(mut self, policy: TopologyFailurePolicy) -> Self {
self.options.failure_policy(policy);
self
}
pub fn msg_timeout(mut self, ms: usize) -> Self {
self.options.msg_timeout(ms);
self
}
pub fn graceful_shutdown(mut self, ms: u64) -> Self {
self.options.graceful_shutdown(ms);
self
}
pub fn metric_flush_interval(mut self, ms: u64) -> Self {
self.options.metric_flush_interval(ms);
self
}
pub fn metric_target(mut self, target: metric::MetricTarget) -> Self {
self.options.metric_target(target);
self
}
pub fn pipeline(mut self, pipe: Pipeline) -> Self {
self.pipeline = pipe.build();
self
}
pub fn source(mut self, sb: SB) -> Self {
self.source_builder = sb;
self
}
pub fn source_actor(&self) -> SourceActor {
SourceActor {
source: Box::new(self.source_builder.build()),
ack_queue: VecDeque::new(),
backoff: 1u64,
metrics: Metrics::default().named(vec!["source"]),
shutdown: false,
}
}
pub fn topology_actor(&self) -> TopologyActor {
TopologyActor {
options: self.options.clone(),
metrics: Metrics::default().named(vec!["topology"]),
retry: None,
}
}
pub fn pipeline_actor(&self) -> PipelineActor {
PipelineActor {
pipeline: self.pipeline.runtime(),
inflight: PipelineInflight::new(self.options.msg_timeout.clone()),
available: PipelineAvailable::new(self.pipeline.names()),
aggregate: PipelineAggregate::new(self.pipeline.names()),
metrics: Metrics::default().named(vec!["pipeline"]),
}
}
}
#[derive(Message, Debug)]
pub(crate) struct ShutdownMsg {}
#[derive(Serialize, Deserialize, Message, Debug)]
pub struct TaskMsg {
pub source_id: MsgId,
pub edge: Edge,
pub index: usize,
pub msg: Msg,
}
#[derive(Message, Debug)]
pub enum TaskRequest {
GetAvailable(usize, String, Option<usize>),
GetAvailableResponse(usize, String, Option<Vec<TaskMsg>>),
}
#[derive(Message, Serialize, Deserialize, Debug)]
pub enum TaskResponse {
Ack(MsgId, Edge, usize, Option<Vec<Msg>>),
Error(MsgId, Edge, usize),
}
pub struct DefaultSource {}
impl Source for DefaultSource {
fn name(&self) -> &'static str {
"Default"
}
fn healthy(&mut self) -> SourceResult<()> {
unimplemented!("Failed to run healthy check")
}
}
pub struct SourceActor {
source: Box<dyn Source>,
ack_queue: VecDeque<MsgId>,
backoff: u64,
metrics: Metrics,
shutdown: bool,
}
impl Default for SourceActor {
fn default() -> Self {
SourceActor {
source: Box::new(DefaultSource {}),
ack_queue: VecDeque::new(),
backoff: 1u64,
metrics: Metrics::default().named(vec!["source"]),
shutdown: false,
}
}
}
impl SourceActor {
fn reset_backoff(&mut self) {
let poll_interval = match self.source.poll_interval() {
Ok(SourceInterval::Millisecond(ms)) => ms,
Err(_err) => &1000u64,
};
self.backoff = *poll_interval;
}
fn backoff(&mut self, bump: u64) {
let max_backoff = self.source.max_backoff().unwrap();
if self.backoff < *max_backoff {
self.backoff += bump;
}
}
fn monitor(&mut self, _ctx: &mut Context<Self>) {
let _ = self.source.monitor();
}
fn poll(&mut self, ctx: &mut Context<Self>) {
trace!(
target: TARGET_SOURCE_ACTOR,
"SourceActor#poll before (backoff={})",
self.backoff
);
let results = match self.source.poll() {
Ok(option) => match option {
Some(results) => {
self.metrics
.incr_labels(vec!["poll"], vec![("status", "success")]);
results
}
None => vec![],
},
Err(_err) => {
self.metrics
.incr_labels(vec!["poll"], vec![("status", "error")]);
vec![]
}
};
let msg_count = results.len();
if msg_count == 0usize {
self.backoff(100u64);
} else {
self.reset_backoff();
}
if msg_count > 0usize {
self.metrics
.counter(vec!["msg", "read"], msg_count as isize);
}
self.metrics.gauge(vec!["backoff"], self.backoff as isize);
if !self.shutdown {
ctx.run_later(Duration::from_millis(self.backoff), Self::poll);
} else {
warn!(
target: TARGET_SOURCE_ACTOR,
"SourceActor shutdown enabled: stop polling"
);
}
let topology = TopologyActor::from_registry();
if !topology.connected() {
error!(
target: TARGET_SOURCE_ACTOR,
"TopologyActor#poll topology actor isn't connected"
);
self.metrics.counter_labels(
vec!["msg", "dropped"],
results.len() as isize,
vec![("from", "source"), ("reason", "topology_disconnected")],
);
return;
}
for msg in results {
match topology.try_send(msg) {
Err(SendError::Full(msg)) => {
error!(
target: TARGET_SOURCE_ACTOR,
"TopologyActor mailbox is full, dropping msg: {:?}", &msg
);
self.metrics.incr_labels(
vec!["msg", "dropped"],
vec![("from", "source"), ("reason", "topology_full")],
);
}
Err(SendError::Closed(msg)) => {
error!(
target: TARGET_SOURCE_ACTOR,
"TopologyActor is closed, dropping msg: {:?}", &msg
);
self.metrics.incr_labels(
vec!["msg", "dropped"],
vec![("from", "source"), ("reason", "topology_closed")],
);
}
Ok(_) => {
self.metrics
.incr_labels(vec!["msg", "moved"], vec![("to", "topology")]);
}
}
}
}
fn batch_ack(&mut self, _ctx: &mut Context<Self>) {
let msgs = self.ack_queue.drain(..).collect::<Vec<_>>();
let len = msgs.len();
if len > 0 {
trace!(target: TARGET_SOURCE_ACTOR, "Acking: {} msgs", &len);
let result = self.source.batch_ack(msgs);
self.ack_result(len, result);
}
}
fn individual_ack(&mut self, _ctx: &mut Context<Self>) {
let msgs = self.ack_queue.drain(..).collect::<Vec<_>>();
trace!(
target: TARGET_SOURCE_ACTOR,
"Ack individual msgs: {} msgs",
msgs.len()
);
for msg in msgs {
let result = self.source.ack(msg);
self.ack_result(1usize, result);
}
}
fn ack_result(&mut self, sent: usize, results: SourceResult<(i32, i32)>) {
match results {
Ok((tried, acked)) => {
let mut labels = vec![];
let error_count = (tried - acked).to_string();
if tried == acked {
labels.push(("status", "success"));
} else {
labels.push(("status", "partial_success"));
labels.push(("error_count", &error_count));
}
self.metrics
.counter_labels(vec!["msg", "acked"], acked as isize, labels);
}
Err(err) => {
error!(target: TARGET_SOURCE_ACTOR, "Ack msg err: {:?}", &err);
self.metrics.counter_labels(
vec!["msg", "acked"],
0isize,
vec![
("status", "error"),
("error_count", &(*&sent as isize).to_string()),
],
);
}
}
}
}
impl Actor for SourceActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
match self.source.setup() {
Err(err) => {
warn!(
target: TARGET_SOURCE_ACTOR,
"Failed to setup source... trigger shutdown: {:?}", &err
);
System::current().stop();
self.metrics.incr(vec!["setup", "error"]);
return;
}
_ => {}
}
self.metrics.add_label("source_name", self.source.name());
self.reset_backoff();
ctx.run_later(Duration::from_millis(self.backoff), Self::poll);
let ack_interval = match self.source.ack_interval() {
Ok(v) => v,
Err(_err) => &SourceInterval::Millisecond(1000),
};
let duration = ack_interval.as_duration();
match self.source.ack_policy() {
Ok(policy) => match policy {
SourceAckPolicy::Batch(_batch_size) => {
ctx.run_interval(duration, Self::batch_ack);
}
SourceAckPolicy::Individual => {
ctx.run_interval(duration, Self::individual_ack);
}
SourceAckPolicy::None => {
warn!(
target: TARGET_SOURCE_ACTOR,
"SourceAckPolicy is None. Disabling ack interval"
);
}
},
_ => {}
}
let monitor_interval = self.source.monitor_interval();
if let Ok(SourceInterval::Millisecond(duration)) = &monitor_interval {
if duration > &0u64 {
let dur = monitor_interval.unwrap().as_duration();
warn!(
target: TARGET_SOURCE_ACTOR,
"Configuring source monitor with interval: {:?}", &dur
);
ctx.run_interval(dur, Self::monitor);
}
}
metric::backend::MetricsBackendActor::subscribe(
"SourceActor",
ctx.address().clone().recipient(),
);
}
}
impl Supervised for SourceActor {}
impl SystemService for SourceActor {}
#[derive(Message, Debug)]
pub struct SourceAckMsg(MsgId);
impl Handler<SourceAckMsg> for SourceActor {
type Result = ();
fn handle(&mut self, msg: SourceAckMsg, _ctx: &mut Context<Self>) {
match &self.source.ack_policy() {
Ok(policy) => match policy {
SourceAckPolicy::Batch(_size) => {
self.ack_queue.push_back(msg.0);
}
SourceAckPolicy::Individual => {
self.ack_queue.push_back(msg.0);
}
SourceAckPolicy::None => {}
},
_ => {}
}
}
}
impl Handler<ShutdownMsg> for SourceActor {
type Result = ();
fn handle(&mut self, _msg: ShutdownMsg, _ctx: &mut Context<Self>) {
warn!(
target: TARGET_SOURCE_ACTOR,
"SourceActor received shutdown msg"
);
self.shutdown = true;
}
}
impl Handler<metric::backend::Flush> for SourceActor {
type Result = ();
fn handle(&mut self, _msg: metric::backend::Flush, _ctx: &mut Context<Self>) {
self.source.flush_metrics();
self.metrics.flush();
}
}
#[derive(Default)]
struct TopologyRetry {
count: usize,
inflight: HashMap<MsgId, SourceMsg>,
queue: VecDeque<MsgId>,
}
impl TopologyRetry {
fn new(count: usize) -> Self {
Self {
count: count,
..Default::default()
}
}
fn store(&mut self, msg: SourceMsg) {
let msg_id = msg.id.clone();
self.inflight.insert(msg_id, msg);
}
fn put(&mut self, msg_id: MsgId) -> bool {
let mut success = false;
match self.inflight.get_mut(&msg_id) {
Some(msg) => {
if msg.delivered < self.count {
self.queue.push_back(msg_id);
success = true;
}
}
None => {}
}
success
}
fn get(&mut self) -> Vec<SourceMsg> {
let mut retry = vec![];
for msg_id in self.queue.drain(..) {
match self.inflight.get_mut(&msg_id) {
Some(msg) => {
msg.delivered += 1;
retry.push(msg.clone());
}
None => {}
}
}
retry
}
fn delete(&mut self, msg_id: MsgId) {
self.inflight.remove(&msg_id);
}
}
#[derive(Default)]
pub struct TopologyActor {
options: TopologyOptions,
metrics: Metrics,
retry: Option<TopologyRetry>,
}
impl TopologyActor {
fn handle_failure(&mut self, msg_id: MsgId) {
match &self.options.failure_policy {
Some(policy) => match policy {
TopologyFailurePolicy::BestEffort => {
let source = SourceActor::from_registry();
if !source.connected() {
error!(
target: TARGET_TOPOLOGY_ACTOR,
"SourceActor isn't connected, fail to generate BestEffort ack"
);
self.metrics.incr_labels(
vec!["source", "msg", "ack", "dropped"],
vec![("reason", "source_disconnected")],
);
return;
}
let _ = source.try_send(SourceAckMsg(msg_id));
}
TopologyFailurePolicy::Retry(count) => match self.retry.as_mut() {
Some(retry) => {
let msg_id2 = msg_id.clone();
if !retry.put(msg_id) {
retry.delete(msg_id2);
warn!(
target: TARGET_TOPOLOGY_ACTOR,
"TopologyFailurePolicy::Retry({}), dropped", &count
);
self.metrics.incr(vec!["source", "msg", "retry", "dropped"]);
} else {
warn!(
target: TARGET_TOPOLOGY_ACTOR,
"TopologyFailurePolicy::Retry({}), queue msg", &count
);
self.metrics.incr(vec!["source", "msg", "retry", "queued"]);
}
}
None => {}
},
TopologyFailurePolicy::None => {
warn!(
target: TARGET_TOPOLOGY_ACTOR,
"TopologyFailurePolicy::None, drop msg"
);
self.metrics.incr_labels(
vec!["source", "msg", "ack", "dropped"],
vec![("reason", "failure_policy_none")],
);
}
},
None => {
warn!(
target: TARGET_TOPOLOGY_ACTOR,
"TopologyFailurePolicy undefined, drop msg"
);
self.metrics.incr_labels(
vec!["source", "msg", "ack", "dropped"],
vec![("reason", "failure_policy_none")],
);
}
}
}
fn retry_failure(&mut self, ctx: &mut Context<Self>) {
if self.retry.is_none() {
return;
}
let addr = ctx.address();
match self.retry.as_mut() {
Some(retry) => {
for mut msg in retry.get() {
msg.ts = now_millis();
let _ = addr.do_send(msg);
}
}
None => {}
}
}
}
impl Actor for TopologyActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
ctx.set_mailbox_capacity(0);
if let Some(TopologyFailurePolicy::Retry(count)) = self.options.failure_policy {
self.retry = Some(TopologyRetry::new(count));
ctx.run_interval(Duration::from_secs(60), Self::retry_failure);
}
metric::backend::MetricsBackendActor::subscribe(
"TopologyActor",
ctx.address().clone().recipient(),
);
}
}
impl Supervised for TopologyActor {}
impl SystemService for TopologyActor {}
impl Handler<SourceMsg> for TopologyActor {
type Result = ();
fn handle(&mut self, msg: SourceMsg, _ctx: &mut Context<Self>) {
let pipeline = PipelineActor::from_registry();
if !pipeline.connected() {
error!(
target: TARGET_TOPOLOGY_ACTOR,
"PipelineActor isn't connected, dropping msg: {:?}", &msg
);
self.metrics.incr_labels(
vec!["msg", "dropped"],
vec![("from", "source"), ("reason", "pipeline_disconnected")],
);
return;
}
if self.retry.is_some() {
if msg.delivered == 0 {
match self.retry.as_mut() {
Some(retry) => {
retry.store(msg.clone());
}
None => {}
}
}
}
match pipeline.try_send(PipelineMsg::TaskRoot(msg)) {
Err(SendError::Full(msg)) => {
error!(
target: TARGET_TOPOLOGY_ACTOR,
"PipelineActor mailbox is full, dropping msg: {:?}", &msg
);
self.metrics.incr_labels(
vec!["msg", "dropped"],
vec![("from", "source"), ("reason", "pipeline_mailbox_full")],
);
}
Err(SendError::Closed(msg)) => {
error!(
target: TARGET_TOPOLOGY_ACTOR,
"PipelineActor is closed, dropping msg: {:?}", &msg
);
self.metrics.incr_labels(
vec!["msg", "dropped"],
vec![("from", "source"), ("reason", "pipeline_closed")],
);
}
Ok(_) => {
self.metrics
.incr_labels(vec!["msg", "moved"], vec![("to", "pipeline")]);
}
}
}
}
impl Handler<TaskRequest> for TopologyActor {
type Result = ();
fn handle(&mut self, msg: TaskRequest, _ctx: &mut Context<Self>) {
match &msg {
TaskRequest::GetAvailable(_, _, _) => {
let pipeline = PipelineActor::from_registry();
if !pipeline.connected() {
error!(
target: TARGET_TOPOLOGY_ACTOR,
"PipelineActor isn't connected, dropping GetAvailable request"
);
self.metrics.incr_labels(
vec!["task", "request", "dropped"],
vec![("reason", "pipeline_disconnected")],
);
return;
}
pipeline.do_send(msg);
}
TaskRequest::GetAvailableResponse(_, _, _) => {
let topology_server = TopologyServer::from_registry();
if !topology_server.connected() {
error!(
target: TARGET_TOPOLOGY_ACTOR,
"TopologyServer isn't connected, dropping GetAvailableResponse"
);
self.metrics.incr_labels(
vec!["task", "response", "dropped"],
vec![("reason", "topology_server_connected_error")],
);
return;
}
topology_server.do_send(msg);
}
}
}
}
impl Handler<TaskResponse> for TopologyActor {
type Result = ();
fn handle(&mut self, msg: TaskResponse, _ctx: &mut Context<Self>) {
let pipeline = PipelineActor::from_registry();
if !pipeline.connected() {
error!(
target: TARGET_TOPOLOGY_ACTOR,
"PipelineActor isn't connected, dropping TaskResponse"
);
self.metrics.incr_labels(
vec!["task", "response", "dropped"],
vec![("reason", "pipeline_disconnected")],
);
return;
}
let pipe_msg = match &msg {
TaskResponse::Ack(_, _, _, _) => PipelineMsg::TaskAck(msg),
TaskResponse::Error(_, _, _) => PipelineMsg::TaskError(msg),
};
pipeline.do_send(pipe_msg);
}
}
impl Handler<PipelineMsg> for TopologyActor {
type Result = ();
fn handle(&mut self, msg: PipelineMsg, _ctx: &mut Context<Self>) {
match msg {
PipelineMsg::SourceMsgAck(msg_id) => {
let source = SourceActor::from_registry();
if !source.connected() {
error!(
target: TARGET_TOPOLOGY_ACTOR,
"SourceActor isn't connected, dropping PipelineMsg::SourceMsgAck"
);
self.metrics.incr_labels(
vec!["source", "msg", "ack", "dropped"],
vec![("reason", "source_disconnected")],
);
return;
}
if self.retry.is_some() {
let retry = self.retry.as_mut().unwrap();
retry.delete(msg_id.clone());
}
let _ = source.try_send(SourceAckMsg(msg_id));
self.metrics
.incr_labels(vec!["source", "msg", "ack"], vec![("from", "pipeline")]);
}
PipelineMsg::SourceMsgTimeout(msg_id) => {
debug!(
target: TARGET_TOPOLOGY_ACTOR,
"Pipeline timeout msg: {:?}", &msg_id
);
self.handle_failure(msg_id);
self.metrics
.incr_labels(vec!["source", "msg", "timeout"], vec![("from", "pipeline")]);
}
PipelineMsg::SourceMsgError(msg_id) => {
debug!(
target: TARGET_TOPOLOGY_ACTOR,
"Pipeline error msg: {:?}", &msg_id
);
self.handle_failure(msg_id);
self.metrics
.incr_labels(vec!["source", "msg", "error"], vec![("from", "pipeline")]);
}
_ => {}
}
}
}
impl Handler<metric::backend::Flush> for TopologyActor {
type Result = ();
fn handle(&mut self, _msg: metric::backend::Flush, _ctx: &mut Context<Self>) {
self.metrics.flush();
}
}
#[derive(Message, Debug)]
pub enum PipelineMsg {
TaskRoot(SourceMsg),
TaskAck(TaskResponse),
TaskError(TaskResponse),
SourceMsgAck(MsgId),
SourceMsgTimeout(MsgId),
SourceMsgError(MsgId),
}
#[derive(Default)]
pub struct PipelineActor {
pub pipeline: Pipeline,
pub inflight: PipelineInflight,
pub available: PipelineAvailable,
pub aggregate: PipelineAggregate,
pub metrics: Metrics,
}
impl Actor for PipelineActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
metric::backend::MetricsBackendActor::subscribe(
"PipelineActor",
ctx.address().clone().recipient(),
);
}
}
impl Supervised for PipelineActor {}
impl SystemService for PipelineActor {}
impl PipelineActor {
pub fn task_root(&mut self, src_msg: SourceMsg) {
let matrix = self.pipeline.matrix.clone();
let root = matrix[0].0.to_string();
let task_name = matrix[0].1.to_string();
let edge = (root, task_name.clone());
let mut msg_state = PipelineMsgState::new(matrix);
&msg_state.edge_start(edge.clone(), 1);
self.inflight
.root(src_msg.id.clone(), src_msg.ts.clone(), msg_state);
let task_msg = TaskMsg {
source_id: src_msg.id.clone(),
edge: edge,
index: 0,
msg: src_msg.msg.clone(),
};
self.available.push(&task_name, task_msg);
self.metrics
.incr_labels(vec!["task", "available"], vec![("task_name", &task_name)]);
}
pub fn task_ack(&mut self, task_resp: TaskResponse) {
match task_resp {
TaskResponse::Ack(msg_id, edge, index, task_result) => {
let ack_name = &edge.1[..];
let mut descendants = &vec![];
match self.pipeline.descendants.get(ack_name) {
Some(names) => descendants = names,
None => {}
};
if let Some(msgs) = task_result {
for name in descendants.iter() {
self.aggregate
.hold(&name.to_string(), msg_id.clone(), msgs.clone());
}
}
let status = self.inflight.ack(&msg_id, &edge, index);
match status {
PipelineInflightStatus::AckEdge(task_visited) => {
if task_visited {
let mut ack_source = false;
for name in descendants.iter() {
let next_edge = (ack_name.to_string(), name.to_string());
match self.aggregate.remove(&name.to_string(), &msg_id) {
Some(msgs) => {
if let Some((_ts, msg_state)) =
self.inflight.get_mut(&msg_id)
{
msg_state.edge_start(next_edge.clone(), msgs.len());
} else {
}
for (index, msg) in msgs.iter().enumerate() {
let task_msg = TaskMsg {
source_id: msg_id.clone(),
edge: next_edge.clone(),
index: index,
msg: msg.to_vec(),
};
self.available.push(&name.to_string(), task_msg);
}
}
None => {
self.inflight.ack_dead_end(
&msg_id,
&next_edge,
&self.pipeline,
);
if self.inflight.finished(&msg_id) {
ack_source = true;
break;
}
}
}
}
if ack_source {
debug!(
target: TARGET_PIPELINE_ACTOR,
"ack_deadend triggered force ack source for this msg_id: {:?}",
&msg_id
);
let topology = TopologyActor::from_registry();
if !topology.connected() {
error!(target: TARGET_PIPELINE_ACTOR, "TopologyActor isn't connected, skipping PipelineMsg::SourceMsgAck");
return;
}
topology.do_send(PipelineMsg::SourceMsgAck(msg_id));
}
} else {
}
}
PipelineInflightStatus::AckSource => {
let topology = TopologyActor::from_registry();
if !topology.connected() {
error!(
target: TARGET_PIPELINE_ACTOR,
"TopologyActor isn't connected, skipping PipelineMsg::SourceMsgAck"
);
return;
}
self.cleanup(&msg_id);
topology.do_send(PipelineMsg::SourceMsgAck(msg_id));
}
PipelineInflightStatus::PendingEdge => {
debug!(
target: TARGET_PIPELINE_ACTOR,
"PendingEdge: waiting to finish msg state: {:?}", &msg_id
);
}
PipelineInflightStatus::Removed => {
debug!(
target: TARGET_PIPELINE_ACTOR,
"PipelineInflightStatus::Removed {:?}", &msg_id
);
self.cleanup(&msg_id);
}
PipelineInflightStatus::Timeout => {
let topology = TopologyActor::from_registry();
if !topology.connected() {
error!(
target: TARGET_PIPELINE_ACTOR,
"TopologyActor isn't connected, skipping PipelineMsg::SourceMsgTimeout"
);
return;
}
self.cleanup(&msg_id);
topology.do_send(PipelineMsg::SourceMsgTimeout(msg_id));
}
}
}
TaskResponse::Error(msg_id, ..) => {
let topology = TopologyActor::from_registry();
if !topology.connected() {
error!(
target: TARGET_PIPELINE_ACTOR,
"TopologyActor isn't connected, skipping PipelineMsg::SourceMsgError"
);
return;
}
self.cleanup(&msg_id);
topology.do_send(PipelineMsg::SourceMsgError(msg_id));
}
}
}
pub fn cleanup(&mut self, msg_id: &MsgId) {
self.inflight.clean_msg_id(msg_id);
self.aggregate.clean_msg_id(msg_id);
}
}
impl Handler<PipelineMsg> for PipelineActor {
type Result = ();
fn handle(&mut self, msg: PipelineMsg, _ctx: &mut Context<Self>) {
match msg {
PipelineMsg::TaskRoot(src_msg) => {
debug!(
target: TARGET_PIPELINE_ACTOR,
"PipelineMsg::TaskRoot(src_msg) = {:?}", &src_msg
);
self.task_root(src_msg);
}
PipelineMsg::TaskAck(task_resp) => {
debug!(
target: TARGET_PIPELINE_ACTOR,
"PipelineMsg::TaskAck(task_resp) = {:?}", &task_resp
);
self.task_ack(task_resp);
}
PipelineMsg::TaskError(task_resp) => {
debug!(
target: TARGET_PIPELINE_ACTOR,
"PipelineMsg::TaskError(task_resp) unimplemented"
);
self.task_ack(task_resp);
}
_ => {
warn!(
target: TARGET_PIPELINE_ACTOR,
"Handler<PipelineMsg> for PipelineMsg reach match unimplemented match arm for msg: {:?}",
&msg
);
}
}
}
}
impl Handler<TaskRequest> for PipelineActor {
type Result = ();
fn handle(&mut self, msg: TaskRequest, _ctx: &mut Context<Self>) {
match msg {
TaskRequest::GetAvailable(session_id, name, count) => {
let topology = TopologyActor::from_registry();
if !topology.connected() {
error!(
target: TARGET_PIPELINE_ACTOR,
"TopologyActor isn't connected, skipping TaskRequest::GetAvailable"
);
return;
}
trace!(
target: TARGET_PIPELINE_ACTOR,
"TaskRequest::GetAvailable(session_id, name, count): {}, {}, {:?}",
&session_id,
&name,
&count
);
let tasks = self.available.pop(&name, count);
topology.do_send(TaskRequest::GetAvailableResponse(session_id, name, tasks));
}
_ => {
warn!(
target: TARGET_PIPELINE_ACTOR,
"Handler<TaskRequest> for TaskRequest only implements TaskRequest::GetAvailable"
);
}
}
}
}
impl Handler<metric::backend::Flush> for PipelineActor {
type Result = ();
fn handle(&mut self, _msg: metric::backend::Flush, _ctx: &mut Context<Self>) {
self.metrics
.gauge(vec!["inflight"], self.inflight.size() as isize);
let stats1 = self.available.stats();
trace!("Available len {}", &stats1.len());
for (task, size) in stats1 {
self.metrics
.gauge_labels(vec!["available"], size, vec![("task", &task)]);
}
let stats2 = self.aggregate.stats();
trace!("aggregate stats len {}", &stats2.len());
for (task, size) in stats2 {
self.metrics
.gauge_labels(vec!["aggregate"], size, vec![("task", &task)]);
}
self.metrics.flush();
}
}