use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub type NodeId = usize;
pub type EdgeId = usize;
pub const META_DIGEST: &str = "digest";
pub const META_PERMISSIONS: &str = "permissions";
pub const META_IDENTITY_SCOPE: &str = "identity_scope";
pub const META_INFERRED: &str = "inferred";
pub const META_CONTAINER: &str = "container";
pub const META_OIDC: &str = "oidc";
pub const META_CLI_FLAG_EXPOSED: &str = "cli_flag_exposed";
pub const META_TRIGGER: &str = "trigger";
pub const META_WRITES_ENV_GATE: &str = "writes_env_gate";
pub const META_ATTESTS: &str = "attests";
pub const META_VARIABLE_GROUP: &str = "variable_group";
pub const META_SELF_HOSTED: &str = "self_hosted";
pub const META_CHECKOUT_SELF: &str = "checkout_self";
pub const META_SERVICE_CONNECTION: &str = "service_connection";
pub const META_IMPLICIT: &str = "implicit";
pub const META_ENV_APPROVAL: &str = "env_approval";
pub const META_JOB_NAME: &str = "job_name";
pub const META_REPOSITORIES: &str = "repositories";
pub const META_SCRIPT_BODY: &str = "script_body";
pub const META_SERVICE_CONNECTION_NAME: &str = "service_connection_name";
pub const META_TERRAFORM_AUTO_APPROVE: &str = "terraform_auto_approve";
pub const META_ADD_SPN_TO_ENV: &str = "add_spn_to_environment";
pub fn is_sha_pinned(ref_str: &str) -> bool {
ref_str.contains('@')
&& ref_str
.split('@')
.next_back()
.map(|s| s.len() >= 40 && s.chars().all(|c| c.is_ascii_hexdigit()))
.unwrap_or(false)
}
pub fn is_docker_digest_pinned(image: &str) -> bool {
image.contains("@sha256:")
&& image
.split("@sha256:")
.nth(1)
.map(|h| h.len() == 64 && h.chars().all(|c| c.is_ascii_hexdigit()))
.unwrap_or(false)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AuthorityCompleteness {
Complete,
Partial,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IdentityScope {
Broad,
Constrained,
Unknown,
}
impl IdentityScope {
pub fn from_permissions(perms: &str) -> Self {
let p = perms.to_lowercase();
if p.contains("write-all") || p.contains("admin") || p == "{}" || p.is_empty() {
IdentityScope::Broad
} else if p.contains("write") {
IdentityScope::Broad
} else if p.contains("read") {
IdentityScope::Constrained
} else {
IdentityScope::Unknown
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NodeKind {
Step,
Secret,
Artifact,
Identity,
Image,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TrustZone {
FirstParty,
ThirdParty,
Untrusted,
}
impl TrustZone {
pub fn is_lower_than(&self, other: &TrustZone) -> bool {
self.rank() < other.rank()
}
fn rank(&self) -> u8 {
match self {
TrustZone::FirstParty => 2,
TrustZone::ThirdParty => 1,
TrustZone::Untrusted => 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
pub id: NodeId,
pub kind: NodeKind,
pub name: String,
pub trust_zone: TrustZone,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EdgeKind {
HasAccessTo,
Produces,
Consumes,
UsesImage,
DelegatesTo,
PersistsTo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Edge {
pub id: EdgeId,
pub from: NodeId,
pub to: NodeId,
pub kind: EdgeKind,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineSource {
pub file: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub repo: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub git_ref: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub commit_sha: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParamSpec {
pub param_type: String,
pub has_values_allowlist: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthorityGraph {
pub source: PipelineSource,
pub nodes: Vec<Node>,
pub edges: Vec<Edge>,
pub completeness: AuthorityCompleteness,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub completeness_gaps: Vec<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub metadata: HashMap<String, String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub parameters: HashMap<String, ParamSpec>,
}
impl AuthorityGraph {
pub fn new(source: PipelineSource) -> Self {
Self {
source,
nodes: Vec::new(),
edges: Vec::new(),
completeness: AuthorityCompleteness::Complete,
completeness_gaps: Vec::new(),
metadata: HashMap::new(),
parameters: HashMap::new(),
}
}
pub fn mark_partial(&mut self, reason: impl Into<String>) {
self.completeness = AuthorityCompleteness::Partial;
self.completeness_gaps.push(reason.into());
}
pub fn add_node(
&mut self,
kind: NodeKind,
name: impl Into<String>,
trust_zone: TrustZone,
) -> NodeId {
let id = self.nodes.len();
self.nodes.push(Node {
id,
kind,
name: name.into(),
trust_zone,
metadata: HashMap::new(),
});
id
}
pub fn add_node_with_metadata(
&mut self,
kind: NodeKind,
name: impl Into<String>,
trust_zone: TrustZone,
metadata: HashMap<String, String>,
) -> NodeId {
let id = self.nodes.len();
self.nodes.push(Node {
id,
kind,
name: name.into(),
trust_zone,
metadata,
});
id
}
pub fn add_edge(&mut self, from: NodeId, to: NodeId, kind: EdgeKind) -> EdgeId {
let id = self.edges.len();
self.edges.push(Edge { id, from, to, kind });
id
}
pub fn edges_from(&self, id: NodeId) -> impl Iterator<Item = &Edge> {
self.edges.iter().filter(move |e| e.from == id)
}
pub fn edges_to(&self, id: NodeId) -> impl Iterator<Item = &Edge> {
self.edges.iter().filter(move |e| e.to == id)
}
pub fn authority_sources(&self) -> impl Iterator<Item = &Node> {
self.nodes
.iter()
.filter(|n| matches!(n.kind, NodeKind::Secret | NodeKind::Identity))
}
pub fn nodes_of_kind(&self, kind: NodeKind) -> impl Iterator<Item = &Node> {
self.nodes.iter().filter(move |n| n.kind == kind)
}
pub fn nodes_in_zone(&self, zone: TrustZone) -> impl Iterator<Item = &Node> {
self.nodes.iter().filter(move |n| n.trust_zone == zone)
}
pub fn node(&self, id: NodeId) -> Option<&Node> {
self.nodes.get(id)
}
pub fn edge(&self, id: EdgeId) -> Option<&Edge> {
self.edges.get(id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_simple_graph() {
let mut g = AuthorityGraph::new(PipelineSource {
file: "deploy.yml".into(),
repo: None,
git_ref: None,
commit_sha: None,
});
let secret = g.add_node(NodeKind::Secret, "AWS_KEY", TrustZone::FirstParty);
let step_build = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
let artifact = g.add_node(NodeKind::Artifact, "dist.tar.gz", TrustZone::FirstParty);
let step_deploy = g.add_node(NodeKind::Step, "deploy", TrustZone::ThirdParty);
g.add_edge(step_build, secret, EdgeKind::HasAccessTo);
g.add_edge(step_build, artifact, EdgeKind::Produces);
g.add_edge(artifact, step_deploy, EdgeKind::Consumes);
assert_eq!(g.nodes.len(), 4);
assert_eq!(g.edges.len(), 3);
assert_eq!(g.authority_sources().count(), 1);
assert_eq!(g.edges_from(step_build).count(), 2);
assert_eq!(g.edges_from(artifact).count(), 1); }
#[test]
fn completeness_default_is_complete() {
let g = AuthorityGraph::new(PipelineSource {
file: "test.yml".into(),
repo: None,
git_ref: None,
commit_sha: None,
});
assert_eq!(g.completeness, AuthorityCompleteness::Complete);
assert!(g.completeness_gaps.is_empty());
}
#[test]
fn mark_partial_records_reason() {
let mut g = AuthorityGraph::new(PipelineSource {
file: "test.yml".into(),
repo: None,
git_ref: None,
commit_sha: None,
});
g.mark_partial("secrets in run: block inferred, not precisely mapped");
assert_eq!(g.completeness, AuthorityCompleteness::Partial);
assert_eq!(g.completeness_gaps.len(), 1);
}
#[test]
fn identity_scope_from_permissions() {
assert_eq!(
IdentityScope::from_permissions("write-all"),
IdentityScope::Broad
);
assert_eq!(
IdentityScope::from_permissions("{ contents: write }"),
IdentityScope::Broad
);
assert_eq!(
IdentityScope::from_permissions("{ contents: read }"),
IdentityScope::Constrained
);
assert_eq!(
IdentityScope::from_permissions("{ id-token: write }"),
IdentityScope::Broad
);
assert_eq!(IdentityScope::from_permissions(""), IdentityScope::Broad);
assert_eq!(
IdentityScope::from_permissions("custom-scope"),
IdentityScope::Unknown
);
}
#[test]
fn trust_zone_ordering() {
assert!(TrustZone::Untrusted.is_lower_than(&TrustZone::FirstParty));
assert!(TrustZone::ThirdParty.is_lower_than(&TrustZone::FirstParty));
assert!(TrustZone::Untrusted.is_lower_than(&TrustZone::ThirdParty));
assert!(!TrustZone::FirstParty.is_lower_than(&TrustZone::FirstParty));
}
}