use crate::proto;
use crate::proto::prost::Message as _;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum HistoryPropagationScope {
OwnHistory,
Lineage,
}
impl From<HistoryPropagationScope> for proto::HistoryPropagationScope {
fn from(scope: HistoryPropagationScope) -> Self {
match scope {
HistoryPropagationScope::OwnHistory => proto::HistoryPropagationScope::OwnHistory,
HistoryPropagationScope::Lineage => proto::HistoryPropagationScope::Lineage,
}
}
}
impl TryFrom<proto::HistoryPropagationScope> for HistoryPropagationScope {
type Error = ();
fn try_from(scope: proto::HistoryPropagationScope) -> std::result::Result<Self, Self::Error> {
match scope {
proto::HistoryPropagationScope::OwnHistory => Ok(Self::OwnHistory),
proto::HistoryPropagationScope::Lineage => Ok(Self::Lineage),
proto::HistoryPropagationScope::None => Err(()),
}
}
}
impl HistoryPropagationScope {
pub(crate) fn to_proto(self) -> proto::HistoryPropagationScope {
self.into()
}
pub(crate) fn from_proto(scope: proto::HistoryPropagationScope) -> Option<Self> {
Self::try_from(scope).ok()
}
}
#[derive(Clone, Debug)]
pub struct PropagatedHistoryChunk {
pub app_id: String,
pub instance_id: String,
pub workflow_name: String,
pub start_event_index: i32,
pub event_count: i32,
pub events: Vec<proto::HistoryEvent>,
}
#[derive(Clone, Debug)]
pub struct PropagatedHistory {
pub scope: HistoryPropagationScope,
pub events: Vec<proto::HistoryEvent>,
pub chunks: Vec<PropagatedHistoryChunk>,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("propagated history: {kind} '{name}' not found")]
pub struct PropagationNotFoundError {
pub kind: &'static str,
pub name: String,
}
impl PropagatedHistory {
pub fn from_proto(p: proto::PropagatedHistory) -> Option<Self> {
let scope = HistoryPropagationScope::from_proto(
proto::HistoryPropagationScope::try_from(p.scope).ok()?,
)?;
let mut all_events = Vec::new();
let mut chunks = Vec::with_capacity(p.chunks.len());
for raw in p.chunks {
let start_event_index = all_events.len() as i32;
let mut decoded = Vec::with_capacity(raw.raw_events.len());
for ev_bytes in &raw.raw_events {
if let Ok(ev) = proto::HistoryEvent::decode(ev_bytes.as_slice()) {
decoded.push(ev);
}
}
let event_count = raw.raw_events.len() as i32;
all_events.extend_from_slice(&decoded);
chunks.push(PropagatedHistoryChunk {
app_id: raw.app_id,
instance_id: raw.instance_id,
workflow_name: raw.workflow_name,
start_event_index,
event_count,
events: decoded,
});
}
Some(Self {
scope,
events: all_events,
chunks,
})
}
pub fn app_ids(&self) -> Vec<String> {
let mut seen = std::collections::HashSet::new();
self.chunks
.iter()
.filter(|c| seen.insert(c.app_id.as_str()))
.map(|c| c.app_id.clone())
.collect()
}
pub fn workflow_by_name(
&self,
name: &str,
) -> Result<&PropagatedHistoryChunk, PropagationNotFoundError> {
self.chunks
.iter()
.find(|c| c.workflow_name == name)
.ok_or_else(|| PropagationNotFoundError {
kind: "workflow",
name: name.to_string(),
})
}
pub fn events_by_app_id(
&self,
app_id: &str,
) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
let mut out = Vec::new();
let mut found = false;
for c in &self.chunks {
if c.app_id == app_id {
found = true;
out.extend(c.events.iter().cloned());
}
}
if found {
Ok(out)
} else {
Err(PropagationNotFoundError {
kind: "app id",
name: app_id.to_string(),
})
}
}
pub fn events_by_instance_id(
&self,
instance_id: &str,
) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
let mut out = Vec::new();
let mut found = false;
for c in &self.chunks {
if c.instance_id == instance_id {
found = true;
out.extend(c.events.iter().cloned());
}
}
if found {
Ok(out)
} else {
Err(PropagationNotFoundError {
kind: "instance id",
name: instance_id.to_string(),
})
}
}
pub fn events_by_workflow_name(
&self,
name: &str,
) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
let mut out = Vec::new();
let mut found = false;
for c in &self.chunks {
if c.workflow_name == name {
found = true;
out.extend(c.events.iter().cloned());
}
}
if found {
Ok(out)
} else {
Err(PropagationNotFoundError {
kind: "workflow",
name: name.to_string(),
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::proto::prost::Message;
fn ev(id: i32) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: id,
timestamp: None,
router: None,
event_type: None,
}
}
fn raw_chunk(app: &str, inst: &str, wf: &str, n: i32) -> proto::PropagatedHistoryChunk {
let raw_events = (0..n).map(|i| ev(i).encode_to_vec()).collect();
proto::PropagatedHistoryChunk {
raw_events,
app_id: app.to_string(),
instance_id: inst.to_string(),
workflow_name: wf.to_string(),
raw_signatures: vec![],
signing_cert_chains: vec![],
}
}
#[test]
fn from_proto_none_returns_none() {
let p = proto::PropagatedHistory {
scope: proto::HistoryPropagationScope::None as i32,
chunks: vec![],
};
assert!(PropagatedHistory::from_proto(p).is_none());
}
#[test]
fn from_proto_decodes_chunks_and_flattens_events() {
let p = proto::PropagatedHistory {
scope: proto::HistoryPropagationScope::Lineage as i32,
chunks: vec![
raw_chunk("app-a", "inst-a", "WfA", 2),
raw_chunk("app-b", "inst-b", "WfB", 3),
],
};
let h = PropagatedHistory::from_proto(p).expect("scope set");
assert_eq!(h.scope, HistoryPropagationScope::Lineage);
assert_eq!(h.events.len(), 5);
assert_eq!(h.chunks.len(), 2);
assert_eq!(h.chunks[0].start_event_index, 0);
assert_eq!(h.chunks[0].event_count, 2);
assert_eq!(h.chunks[1].start_event_index, 2);
assert_eq!(h.chunks[1].event_count, 3);
}
#[test]
fn app_ids_are_deduplicated_in_chain_order() {
let p = proto::PropagatedHistory {
scope: proto::HistoryPropagationScope::Lineage as i32,
chunks: vec![
raw_chunk("app-a", "i1", "Wf1", 1),
raw_chunk("app-b", "i2", "Wf2", 1),
raw_chunk("app-a", "i3", "Wf3", 1),
],
};
let h = PropagatedHistory::from_proto(p).unwrap();
assert_eq!(h.app_ids(), vec!["app-a".to_string(), "app-b".to_string()]);
}
#[test]
fn filters_return_not_found_for_missing_names() {
let p = proto::PropagatedHistory {
scope: proto::HistoryPropagationScope::OwnHistory as i32,
chunks: vec![raw_chunk("app-a", "inst", "WfA", 1)],
};
let h = PropagatedHistory::from_proto(p).unwrap();
assert!(h.workflow_by_name("missing").is_err());
assert!(h.events_by_app_id("missing").is_err());
assert!(h.events_by_instance_id("missing").is_err());
assert!(h.events_by_workflow_name("missing").is_err());
}
#[test]
fn filters_return_matching_events() {
let p = proto::PropagatedHistory {
scope: proto::HistoryPropagationScope::Lineage as i32,
chunks: vec![
raw_chunk("app-a", "inst-a", "WfA", 2),
raw_chunk("app-b", "inst-b", "WfB", 3),
],
};
let h = PropagatedHistory::from_proto(p).unwrap();
assert_eq!(h.events_by_app_id("app-a").unwrap().len(), 2);
assert_eq!(h.events_by_instance_id("inst-b").unwrap().len(), 3);
assert_eq!(h.events_by_workflow_name("WfA").unwrap().len(), 2);
assert_eq!(h.workflow_by_name("WfB").unwrap().instance_id, "inst-b");
}
}