Skip to main content

taudit_sink_cloudevents/
lib.rs

1use serde::Serialize;
2use taudit_core::baselines::compute_pipeline_identity_material_hash;
3use taudit_core::error::TauditError;
4use taudit_core::finding::{
5    compute_finding_group_id, compute_fingerprint, compute_suppression_key, rule_id_for, Finding,
6    FindingCategory,
7};
8use taudit_core::graph::AuthorityGraph;
9use taudit_core::ports::ReportSink;
10
11// ---------------------------------------------------------------------------
12// CloudEvents 1.0 envelope — hand-rolled, matches CellOS pattern.
13// No dependency on cloudevents-sdk (0.9.x, pre-1.0, unstable API).
14// ---------------------------------------------------------------------------
15
16/// Minimal CloudEvents 1.0 JSON envelope.
17#[derive(Debug, Clone, Serialize)]
18pub struct CloudEventV1 {
19    pub specversion: String,
20    pub id: String,
21    pub source: String,
22    #[serde(rename = "type")]
23    pub ty: String,
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub subject: Option<String>,
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub datacontenttype: Option<String>,
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub time: Option<String>,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub data: Option<serde_json::Value>,
32    // Extension attributes — CloudEvents 1.0 allows arbitrary top-level keys.
33    /// Authority graph completeness: "complete", "partial", or "unknown".
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub tauditcompleteness: Option<String>,
36    /// Structured per-gap detail for `Partial` graphs. Each entry pairs the
37    /// typed `GapKind` (`expression` | `structural` | `opaque`, serde
38    /// snake_case) with the original human-readable reason string. Lets SIEMs
39    /// route or suppress events by gap *category* without parsing prose.
40    /// Omitted entirely for `Complete` / `Unknown` graphs (no null, no empty
41    /// array). Per CloudEvents 1.0, extension attribute names must be
42    /// lowercase with no separators — hence `tauditcompletenessgaps`.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub tauditcompletenessgaps: Option<Vec<serde_json::Value>>,
45    /// Stable cross-run finding fingerprint. 16 lowercase hex chars,
46    /// byte-identical to SARIF `partialFingerprints[primaryLocationLineHash]`
47    /// and JSON `findings[].fingerprint`. SIEMs key on this attribute to
48    /// dedup findings across re-runs. Per CloudEvents 1.0, extension
49    /// attribute names must be lowercase with no separators — hence
50    /// `tauditfindingfingerprint` rather than the dashed/snaked form.
51    pub tauditfindingfingerprint: String,
52    /// Operator-stable waiver key. Coarser than
53    /// `tauditfindingfingerprint`; use for suppressions that should survive
54    /// harmless surrounding workflow edits.
55    pub tauditsuppressionkey: String,
56    /// Canonical snake_case rule id, byte-identical to JSON
57    /// `findings[].rule_id` and SARIF `result.ruleId`. The CloudEvents
58    /// `type` field stays scoped to the `FindingCategory` (so SIEM routing
59    /// rules remain stable across rule additions); this extension exposes
60    /// the precise rule that fired so consumers can filter / suppress at
61    /// rule granularity. For custom YAML rules with a `[id] …` message
62    /// prefix the bracketed id wins, matching `taudit_core::finding::
63    /// rule_id_for`. Per CloudEvents 1.0 §3.1, extension attribute names
64    /// must be lowercase with no separators — hence `tauditruleid`.
65    pub tauditruleid: String,
66    /// CI/CD platform of the underlying pipeline: `"ado"`, `"gha"`, or
67    /// `"gitlab"`. Lets SIEM correlation rules route events by platform
68    /// without re-parsing the `subject` (file path). Source: the resolved
69    /// `Platform` variant for the scanned file, surfaced via
70    /// `graph.metadata["platform"]`. Optional in v1 for backward-compat;
71    /// always emitted by current taudit when the parser stamped the
72    /// metadata key.
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub tauditplatform: Option<String>,
75    /// Stable UUID v5 over the fingerprint. Same value as JSON
76    /// `findings[].finding_group_id` and SARIF `properties.findingGroupId`.
77    /// SIEMs `SELECT DISTINCT ON (tauditfindinggroup)` to collapse
78    /// per-hop findings against the same authority root into one event.
79    /// CloudEvents 1.0 attribute names must be lowercase with no
80    /// separators — hence `tauditfindinggroup` (no underscore).
81    pub tauditfindinggroup: String,
82    /// Shared correlation key for a single operator flow.
83    pub correlationid: String,
84    /// Stable pipeline identifier URN. Prefer caller/parser-supplied
85    /// `graph.metadata["pipeline_content_hash"]` /
86    /// `graph.metadata["pipeline_identity_material_hash"]` when present,
87    /// otherwise fall back to deterministic derivation from authority-graph
88    /// identity material. Shape: `urn:taudit:pipeline:sha256:<64-hex>`.
89    pub tauditpipelineid: String,
90    /// Per-invocation scan-run identifier shared by all findings emitted in a
91    /// single `emit` call. Distinct from `correlationid`, which is the
92    /// cross-tool operator-flow join key and may intentionally span multiple
93    /// scans.
94    pub tauditscanrunid: String,
95    /// Repository that emitted the event.
96    pub provenancerepo: String,
97    /// Binary, command, or subsystem that produced the event.
98    pub provenanceproducer: String,
99    /// Producer version or schema version.
100    pub provenanceversion: String,
101    /// High-level evidence kind.
102    pub provenancekind: String,
103}
104
105/// Event source identifier for taudit.
106const EVENT_SOURCE: &str = "taudit";
107const PROVENANCE_REPO: &str = "taudit";
108const PROVENANCE_PRODUCER: &str = "taudit-sink-cloudevents";
109const PROVENANCE_KIND: &str = "finding";
110
111fn is_sha256_prefixed_digest(value: &str) -> bool {
112    value
113        .strip_prefix("sha256:")
114        .map(|hex| hex.len() == 64 && hex.chars().all(|c| c.is_ascii_hexdigit()))
115        .unwrap_or(false)
116}
117
118fn derive_pipeline_id(graph: &AuthorityGraph) -> String {
119    // Prefer explicit identity hashes if upstream code stamped them into graph
120    // metadata. Fall back to deterministic graph identity material derivation
121    // so sink-only call sites still emit a stable pipeline identifier.
122    let hash = ["pipeline_content_hash", "pipeline_identity_material_hash"]
123        .iter()
124        .filter_map(|key| graph.metadata.get(*key))
125        .find(|value| is_sha256_prefixed_digest(value))
126        .cloned()
127        .unwrap_or_else(|| compute_pipeline_identity_material_hash(graph));
128
129    format!("urn:taudit:pipeline:{hash}")
130}
131
132/// Map a FindingCategory to a CloudEvents type string.
133fn event_type(category: FindingCategory) -> String {
134    let suffix = match category {
135        FindingCategory::AuthorityPropagation => "authority_propagation",
136        FindingCategory::OverPrivilegedIdentity => "over_privileged_identity",
137        FindingCategory::UnpinnedAction => "unpinned_action",
138        FindingCategory::UntrustedWithAuthority => "untrusted_with_authority",
139        FindingCategory::ArtifactBoundaryCrossing => "artifact_boundary_crossing",
140        FindingCategory::FloatingImage => "floating_image",
141        FindingCategory::LongLivedCredential => "long_lived_credential",
142        FindingCategory::PersistedCredential => "persisted_credential",
143        FindingCategory::TriggerContextMismatch => "trigger_context_mismatch",
144        FindingCategory::CrossWorkflowAuthorityChain => "cross_workflow_authority_chain",
145        FindingCategory::AuthorityCycle => "authority_cycle",
146        FindingCategory::UpliftWithoutAttestation => "uplift_without_attestation",
147        FindingCategory::SelfMutatingPipeline => "self_mutating_pipeline",
148        FindingCategory::CheckoutSelfPrExposure => "checkout_self_pr_exposure",
149        FindingCategory::VariableGroupInPrJob => "variable_group_in_pr_job",
150        FindingCategory::SelfHostedPoolPrHijack => "self_hosted_pool_pr_hijack",
151        FindingCategory::ServiceConnectionScopeMismatch => "service_connection_scope_mismatch",
152        FindingCategory::TemplateExtendsUnpinnedBranch => "template_extends_unpinned_branch",
153        FindingCategory::TemplateRepoRefIsFeatureBranch => "template_repo_ref_is_feature_branch",
154        FindingCategory::VmRemoteExecViaPipelineSecret => "vm_remote_exec_via_pipeline_secret",
155        FindingCategory::ShortLivedSasInCommandLine => "short_lived_sas_in_command_line",
156        FindingCategory::SecretToInlineScriptEnvExport => "secret_to_inline_script_env_export",
157        FindingCategory::SecretMaterialisedToWorkspaceFile => {
158            "secret_materialised_to_workspace_file"
159        }
160        FindingCategory::KeyVaultSecretToPlaintext => "keyvault_secret_to_plaintext",
161        FindingCategory::TerraformAutoApproveInProd => "terraform_auto_approve_in_prod",
162        FindingCategory::AddSpnWithInlineScript => "add_spn_with_inline_script",
163        FindingCategory::ParameterInterpolationIntoShell => "parameter_interpolation_into_shell",
164        FindingCategory::RuntimeScriptFetchedFromFloatingUrl => {
165            "runtime_script_fetched_from_floating_url"
166        }
167        FindingCategory::PrTriggerWithFloatingActionRef => "pr_trigger_with_floating_action_ref",
168        FindingCategory::UntrustedApiResponseToEnvSink => "untrusted_api_response_to_env_sink",
169        FindingCategory::PrBuildPushesImageWithFloatingCredentials => {
170            "pr_build_pushes_image_with_floating_credentials"
171        }
172        FindingCategory::SecretViaEnvGateToUntrustedConsumer => {
173            "secret_via_env_gate_to_untrusted_consumer"
174        }
175        FindingCategory::NoWorkflowLevelPermissionsBlock => "no_workflow_level_permissions_block",
176        FindingCategory::ProdDeployJobNoEnvironmentGate => "prod_deploy_job_no_environment_gate",
177        FindingCategory::LongLivedSecretWithoutOidcRecommendation => {
178            "long_lived_secret_without_oidc_recommendation"
179        }
180        FindingCategory::PullRequestWorkflowInconsistentForkCheck => {
181            "pull_request_workflow_inconsistent_fork_check"
182        }
183        FindingCategory::GitlabDeployJobMissingProtectedBranchOnly => {
184            "gitlab_deploy_job_missing_protected_branch_only"
185        }
186        FindingCategory::TerraformOutputViaSetvariableShellExpansion => {
187            "terraform_output_via_setvariable_shell_expansion"
188        }
189        FindingCategory::RiskyTriggerWithAuthority => "risky_trigger_with_authority",
190        FindingCategory::SensitiveValueInJobOutput => "sensitive_value_in_job_output",
191        FindingCategory::ManualDispatchInputToUrlOrCommand => {
192            "manual_dispatch_input_to_url_or_command"
193        }
194        FindingCategory::SecretsInheritOverscopedPassthrough => {
195            "secrets_inherit_overscoped_passthrough"
196        }
197        FindingCategory::UnsafePrArtifactInWorkflowRunConsumer => {
198            "unsafe_pr_artifact_in_workflow_run_consumer"
199        }
200        FindingCategory::ScriptInjectionViaUntrustedContext => {
201            "script_injection_via_untrusted_context"
202        }
203        FindingCategory::InteractiveDebugActionInAuthorityWorkflow => {
204            "interactive_debug_action_in_authority_workflow"
205        }
206        FindingCategory::PrSpecificCacheKeyInDefaultBranchConsumer => {
207            "pr_specific_cache_key_in_default_branch_consumer"
208        }
209        FindingCategory::GhCliWithDefaultTokenEscalating => "gh_cli_with_default_token_escalating",
210        FindingCategory::GhaScriptInjectionToPrivilegedShell => {
211            "gha_script_injection_to_privileged_shell"
212        }
213        FindingCategory::GhaWorkflowRunArtifactPoisoningToPrivilegedConsumer => {
214            "gha_workflow_run_artifact_poisoning_to_privileged_consumer"
215        }
216        FindingCategory::GhaRemoteScriptInAuthorityJob => "gha_remote_script_in_authority_job",
217        FindingCategory::GhaPatRemoteUrlWrite => "gha_pat_remote_url_write",
218        FindingCategory::GhaIssueCommentCommandToWriteToken => {
219            "gha_issue_comment_command_to_write_token"
220        }
221        FindingCategory::GhaPrBuildPushesPublishableImage => {
222            "gha_pr_build_pushes_publishable_image"
223        }
224        FindingCategory::GhaManualDispatchRefToPrivilegedCheckout => {
225            "gha_manual_dispatch_ref_to_privileged_checkout"
226        }
227        FindingCategory::CiJobTokenToExternalApi => "ci_job_token_to_external_api",
228        FindingCategory::IdTokenAudienceOverscoped => "id_token_audience_overscoped",
229        FindingCategory::UntrustedCiVarInShellInterpolation => {
230            "untrusted_ci_var_in_shell_interpolation"
231        }
232        FindingCategory::UnpinnedIncludeRemoteOrBranchRef => {
233            "unpinned_include_remote_or_branch_ref"
234        }
235        FindingCategory::DindServiceGrantsHostAuthority => "dind_service_grants_host_authority",
236        FindingCategory::SecurityJobSilentlySkipped => "security_job_silently_skipped",
237        FindingCategory::ChildPipelineTriggerInheritsAuthority => {
238            "child_pipeline_trigger_inherits_authority"
239        }
240        FindingCategory::CacheKeyCrossesTrustBoundary => "cache_key_crosses_trust_boundary",
241        FindingCategory::PatEmbeddedInGitRemoteUrl => "pat_embedded_in_git_remote_url",
242        FindingCategory::CiTokenTriggersDownstreamWithVariablePassthrough => {
243            "ci_token_triggers_downstream_with_variable_passthrough"
244        }
245        FindingCategory::DotenvArtifactFlowsToPrivilegedDeployment => {
246            "dotenv_artifact_flows_to_privileged_deployment"
247        }
248        FindingCategory::SharedSelfHostedPoolNoIsolation => "shared_self_hosted_pool_no_isolation",
249        FindingCategory::SetvariableIssecretFalse => "setvariable_issecret_false",
250        FindingCategory::HomoglyphInActionRef => "homoglyph_in_action_ref",
251        FindingCategory::GhaHelperPathSensitiveArgv => "gha_helper_path_sensitive_argv",
252        FindingCategory::GhaHelperPathSensitiveStdin => "gha_helper_path_sensitive_stdin",
253        FindingCategory::GhaHelperPathSensitiveEnv => "gha_helper_path_sensitive_env",
254        FindingCategory::GhaPostAmbientEnvCleanupPath => "gha_post_ambient_env_cleanup_path",
255        FindingCategory::GhaActionMintedSecretToHelper => "gha_action_minted_secret_to_helper",
256        FindingCategory::GhaHelperUntrustedPathResolution => "gha_helper_untrusted_path_resolution",
257        FindingCategory::GhaSecretOutputAfterHelperLogin => "gha_secret_output_after_helper_login",
258        FindingCategory::LaterSecretMaterializedAfterPathMutation => {
259            "later_secret_materialized_after_path_mutation"
260        }
261        FindingCategory::GhaSetupNodeCacheHelperPathHandoff => {
262            "gha_setup_node_cache_helper_path_handoff"
263        }
264        FindingCategory::GhaSetupPythonCacheHelperPathHandoff => {
265            "gha_setup_python_cache_helper_path_handoff"
266        }
267        FindingCategory::GhaSetupPythonPipInstallAuthorityEnv => {
268            "gha_setup_python_pip_install_authority_env"
269        }
270        FindingCategory::GhaSetupGoCacheHelperPathHandoff => {
271            "gha_setup_go_cache_helper_path_handoff"
272        }
273        FindingCategory::GhaDockerSetupQemuPrivilegedDockerHelper => {
274            "gha_docker_setup_qemu_privileged_docker_helper"
275        }
276        FindingCategory::GhaToolInstallerThenShellHelperAuthority => {
277            "gha_tool_installer_then_shell_helper_authority"
278        }
279        FindingCategory::GhaWorkflowShellAuthorityConcentration => {
280            "gha_workflow_shell_authority_concentration"
281        }
282        FindingCategory::GhaActionTokenEnvBeforeBareDownloadHelper => {
283            "gha_action_token_env_before_bare_download_helper"
284        }
285        FindingCategory::GhaPostActionInputRetargetToCacheSave => {
286            "gha_post_action_input_retarget_to_cache_save"
287        }
288        FindingCategory::GhaTerraformWrapperSensitiveOutput => {
289            "gha_terraform_wrapper_sensitive_output"
290        }
291        FindingCategory::GhaCompositeBareHelperAfterPathInstallWithSecretEnv => {
292            "gha_composite_bare_helper_after_path_install_with_secret_env"
293        }
294        FindingCategory::GhaPulumiPathResolvedCliWithAuthority => {
295            "gha_pulumi_path_resolved_cli_with_authority"
296        }
297        FindingCategory::GhaPypiPublishOidcAfterPathMutation => {
298            "gha_pypi_publish_oidc_after_path_mutation"
299        }
300        FindingCategory::GhaChangesetsPublishCommandWithAuthority => {
301            "gha_changesets_publish_command_with_authority"
302        }
303        FindingCategory::GhaRubygemsReleaseGitTokenAndOidcHelper => {
304            "gha_rubygems_release_git_token_and_oidc_helper"
305        }
306        FindingCategory::GhaCompositeEntrypointPathShadowWithSecretEnv => {
307            "gha_composite_entrypoint_path_shadow_with_secret_env"
308        }
309        FindingCategory::GhaDockerBuildxAuthorityPathHandoff => {
310            "gha_docker_buildx_authority_path_handoff"
311        }
312        FindingCategory::GhaGoogleDeployGcloudCredentialPath => {
313            "gha_google_deploy_gcloud_credential_path"
314        }
315        FindingCategory::GhaDatadogTestVisibilityInstallerAuthority => {
316            "gha_datadog_test_visibility_installer_authority"
317        }
318        FindingCategory::GhaKubernetesHelperKubeconfigAuthority => {
319            "gha_kubernetes_helper_kubeconfig_authority"
320        }
321        FindingCategory::GhaAzureCompanionHelperAuthority => "gha_azure_companion_helper_authority",
322        FindingCategory::GhaCreatePrGitTokenPathHandoff => "gha_create_pr_git_token_path_handoff",
323        FindingCategory::GhaImportGpgPrivateKeyHelperPath => {
324            "gha_import_gpg_private_key_helper_path"
325        }
326        FindingCategory::GhaSshAgentPrivateKeyToPathHelper => {
327            "gha_ssh_agent_private_key_to_path_helper"
328        }
329        FindingCategory::GhaMacosCodesignCertSecurityPath => {
330            "gha_macos_codesign_cert_security_path"
331        }
332        FindingCategory::GhaPagesDeployTokenUrlToGitHelper => {
333            "gha_pages_deploy_token_url_to_git_helper"
334        }
335        FindingCategory::GhaWorkflowRunArtifactMetadataToPrivilegedApi => {
336            "gha_workflow_run_artifact_metadata_to_privileged_api"
337        }
338        FindingCategory::GhaWorkflowRunArtifactReportToPrComment => {
339            "gha_workflow_run_artifact_report_to_pr_comment"
340        }
341        FindingCategory::GhaWorkflowRunArtifactToBuildScanPublish => {
342            "gha_workflow_run_artifact_to_build_scan_publish"
343        }
344        FindingCategory::GhaFloatingRemoteScriptBeforePublishSink => {
345            "gha_floating_remote_script_before_publish_sink"
346        }
347        FindingCategory::GhaTokenRemoteUrlWithTraceOrProcessExposure => {
348            "gha_token_remote_url_with_trace_or_process_exposure"
349        }
350        FindingCategory::GhaEnvCredentialHelperConfigRedirectBeforeAuthority => {
351            "gha_env_credential_helper_config_redirect_before_authority"
352        }
353        FindingCategory::GhaEnvNodeOptionsCodeInjectionBeforeNodeAuthority => {
354            "gha_env_node_options_code_injection_before_node_authority"
355        }
356        FindingCategory::GhaEnvDyldOrLdLibraryPathBeforeCredentialHelper => {
357            "gha_env_dyld_or_ld_library_path_before_credential_helper"
358        }
359        FindingCategory::GhaWorkflowCallContainerImageInputSecretsInherit => {
360            "gha_workflow_call_container_image_input_secrets_inherit"
361        }
362        FindingCategory::GhaWorkflowCallRunnerLabelInputPrivilegeEscalation => {
363            "gha_workflow_call_runner_label_input_privilege_escalation"
364        }
365        FindingCategory::GhaContainerImageAttackerInfluencedWithSecretEnv => {
366            "gha_container_image_attacker_influenced_with_secret_env"
367        }
368        FindingCategory::GhaAttestationSubjectDigestFromStepOutputUnverified => {
369            "gha_attestation_subject_digest_from_step_output_unverified"
370        }
371        FindingCategory::GhaAttestationSubjectPathWorkspaceGlobWithPrTrigger => {
372            "gha_attestation_subject_path_workspace_glob_with_pr_trigger"
373        }
374        FindingCategory::GhaAttestationConfigDrivenGateFromWorkspaceFile => {
375            "gha_attestation_config_driven_gate_from_workspace_file"
376        }
377        FindingCategory::GhaTelemetryPrOrIssueTextToExternalSink => {
378            "gha_telemetry_pr_or_issue_text_to_external_sink"
379        }
380        FindingCategory::GhaTelemetryDebugFlagWithSecretEnv => {
381            "gha_telemetry_debug_flag_with_secret_env"
382        }
383        FindingCategory::GhaTelemetryAutonomousAgentInputFromUntrustedEvent => {
384            "gha_telemetry_autonomous_agent_input_from_untrusted_event"
385        }
386        FindingCategory::GhaWorkflowRunArtifactToBlobStorageToken => {
387            "gha_workflow_run_artifact_to_blob_storage_token"
388        }
389        FindingCategory::GhaApiWorkflowRunArtifactToAutonomousAgentToGitPush => {
390            "gha_api_workflow_run_artifact_to_autonomous_agent_to_git_push"
391        }
392        FindingCategory::GhaManifestNpmLifecycleHookPrTriggerWithToken => {
393            "gha_manifest_npm_lifecycle_hook_pr_trigger_with_token"
394        }
395        FindingCategory::GhaManifestPythonMBuildWithPrCredentials => {
396            "gha_manifest_python_m_build_with_pr_credentials"
397        }
398        FindingCategory::GhaManifestCargoBuildRsPullRequestWithToken => {
399            "gha_manifest_cargo_build_rs_pull_request_with_token"
400        }
401        FindingCategory::GhaManifestMakefileWithPrTriggerAndSecrets => {
402            "gha_manifest_makefile_with_pr_trigger_and_secrets"
403        }
404        FindingCategory::GhaManifestSubmodulesRecursiveWithPrAuthority => {
405            "gha_manifest_submodules_recursive_with_pr_authority"
406        }
407        FindingCategory::GhaCrossrepoWorkflowCallFloatingRefCascade => {
408            "gha_crossrepo_workflow_call_floating_ref_cascade"
409        }
410        FindingCategory::GhaCrossrepoSecretsInheritUnreviewedCallee => {
411            "gha_crossrepo_secrets_inherit_unreviewed_callee"
412        }
413        FindingCategory::GhaToolcacheAbsolutePathDowngrade => {
414            "gha_toolcache_absolute_path_downgrade"
415        }
416        #[allow(deprecated)]
417        FindingCategory::EgressBlindspot => "egress_blindspot",
418        #[allow(deprecated)]
419        FindingCategory::MissingAuditTrail => "missing_audit_trail",
420    };
421    format!("io.taudit.finding.{suffix}")
422}
423
424/// Build a CloudEvents 1.0 envelope for a single finding.
425fn finding_to_event(
426    finding: &Finding,
427    graph: &AuthorityGraph,
428    correlation_id: &str,
429    pipeline_id: &str,
430    scan_run_id: &str,
431) -> CloudEventV1 {
432    let data = serde_json::to_value(finding)
433        .unwrap_or_else(|_| serde_json::Value::String(finding.message.clone()));
434
435    let completeness_str = match graph.completeness {
436        taudit_core::graph::AuthorityCompleteness::Complete => "complete",
437        taudit_core::graph::AuthorityCompleteness::Partial => "partial",
438        taudit_core::graph::AuthorityCompleteness::Unknown => "unknown",
439    };
440
441    // Surface the resolved CI/CD platform as an extension attribute when the
442    // parser stamped `metadata["platform"]`. Permitted values: "ado", "gha",
443    // "gitlab". Anything else is dropped — better to omit than to ship a
444    // value SIEM rules can't pattern-match on.
445    let tauditplatform = graph
446        .metadata
447        .get("platform")
448        .and_then(|v| match v.as_str() {
449            "ado" | "gha" | "gitlab" => Some(v.clone()),
450            _ => None,
451        });
452
453    // Pair each typed `GapKind` with its prose reason. `completeness_gap_kinds`
454    // and `completeness_gaps` are append-only parallel vectors maintained by
455    // `AuthorityGraph::mark_partial`, so `.zip` yields one entry per gap with
456    // the original ordering preserved. Emit as `Some(vec)` only when the graph
457    // actually has gaps — `skip_serializing_if` then drops the attribute on
458    // Complete / Unknown graphs (no null, no empty `[]`).
459    let tauditcompletenessgaps = if graph.completeness_gap_kinds.is_empty() {
460        None
461    } else {
462        Some(
463            graph
464                .completeness_gap_kinds
465                .iter()
466                .zip(graph.completeness_gaps.iter())
467                .map(|(kind, reason)| serde_json::json!({"kind": kind, "reason": reason}))
468                .collect::<Vec<_>>(),
469        )
470    };
471
472    CloudEventV1 {
473        specversion: "1.0".into(),
474        id: uuid::Uuid::new_v4().to_string(),
475        source: EVENT_SOURCE.into(),
476        ty: event_type(finding.category),
477        subject: Some(graph.source.file.clone()),
478        datacontenttype: Some("application/json".into()),
479        time: Some(chrono::Utc::now().to_rfc3339()),
480        data: Some(data),
481        tauditcompleteness: Some(completeness_str.into()),
482        tauditcompletenessgaps,
483        tauditfindingfingerprint: compute_fingerprint(finding, graph),
484        tauditsuppressionkey: compute_suppression_key(finding, graph),
485        tauditruleid: rule_id_for(finding),
486        tauditplatform,
487        tauditfindinggroup: finding
488            .extras
489            .finding_group_id
490            .clone()
491            .unwrap_or_else(|| compute_finding_group_id(&compute_fingerprint(finding, graph))),
492        correlationid: correlation_id.to_string(),
493        tauditpipelineid: pipeline_id.to_string(),
494        tauditscanrunid: scan_run_id.to_string(),
495        provenancerepo: PROVENANCE_REPO.into(),
496        provenanceproducer: PROVENANCE_PRODUCER.into(),
497        provenanceversion: env!("CARGO_PKG_VERSION").into(),
498        provenancekind: PROVENANCE_KIND.into(),
499    }
500}
501
502// ---------------------------------------------------------------------------
503// ReportSink implementation — one JSONL line per finding.
504// ---------------------------------------------------------------------------
505
506/// Environment variable used as the inbound channel for a caller-supplied
507/// correlation id. Set by CellOS supervisor or CI runners to thread the
508/// caller's `correlationId` through to every emitted CloudEvent in this
509/// scan, so the taudit findings can be joined to the upstream run that
510/// triggered them. Falls back to a freshly minted `Uuid::new_v4()` when
511/// unset.
512///
513/// Precedence (highest → lowest):
514///   1. `CloudEventsJsonlSink { correlation_id: Some(non_empty) }` —
515///      explicit constructor argument wins (programmatic embedders).
516///   2. Non-empty `TAUDIT_CORRELATION_ID` env var — for CLI / CI use.
517///   3. `Uuid::new_v4()` — preserves prior behaviour for unconfigured callers.
518pub const CORRELATION_ID_ENV: &str = "TAUDIT_CORRELATION_ID";
519/// Environment variable used as the inbound channel for a caller-supplied
520/// scan-run id. This is distinct from `TAUDIT_CORRELATION_ID`: one operator
521/// flow may execute multiple scans, each with a distinct scan run id.
522pub const SCAN_RUN_ID_ENV: &str = "TAUDIT_SCAN_RUN_ID";
523
524/// JSONL CloudEvents sink — one event per finding.
525///
526/// Construct with `CloudEventsJsonlSink::default()` (or `::new()`) for the
527/// historical "mint a fresh UUID per emit" behaviour, or with
528/// `CloudEventsJsonlSink::with_correlation_id(Some("…"))` to thread a
529/// caller-supplied correlation id through every event in the emission.
530#[derive(Debug, Default, Clone)]
531pub struct CloudEventsJsonlSink {
532    /// Caller-supplied correlation id. When `Some`, takes precedence over
533    /// the `TAUDIT_CORRELATION_ID` env var and the minted UUID fallback.
534    correlation_id: Option<String>,
535    /// Caller-supplied scan-run id. When `Some`, takes precedence over
536    /// the `TAUDIT_SCAN_RUN_ID` env var and the minted UUID fallback.
537    scan_run_id: Option<String>,
538}
539
540impl CloudEventsJsonlSink {
541    /// Construct a sink with the default (unconfigured) correlation source —
542    /// equivalent to `CloudEventsJsonlSink::default()`.
543    pub fn new() -> Self {
544        Self::default()
545    }
546
547    /// Construct a sink with an explicit caller-supplied correlation id.
548    /// `Some(non_empty_id)` overrides both the `TAUDIT_CORRELATION_ID` env
549    /// var and the UUID fallback; `None` or an empty string defers to env var,
550    /// then UUID.
551    pub fn with_correlation_id(correlation_id: Option<String>) -> Self {
552        Self {
553            correlation_id,
554            scan_run_id: None,
555        }
556    }
557
558    /// Construct a sink with explicit caller-supplied identifiers.
559    ///
560    /// `correlation_id` and `scan_run_id` each override their corresponding
561    /// env var and UUID fallback when set.
562    pub fn with_ids(correlation_id: Option<String>, scan_run_id: Option<String>) -> Self {
563        Self {
564            correlation_id,
565            scan_run_id,
566        }
567    }
568
569    /// Resolve the correlation id for one `emit` call using the documented
570    /// precedence: explicit ctor arg → `TAUDIT_CORRELATION_ID` env var →
571    /// minted `Uuid::new_v4()`.
572    fn resolve_correlation_id(&self) -> String {
573        self.correlation_id
574            .clone()
575            .and_then(non_empty_env_value)
576            .or_else(|| {
577                std::env::var(CORRELATION_ID_ENV)
578                    .ok()
579                    .and_then(non_empty_env_value)
580            })
581            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
582    }
583
584    /// Resolve the scan run id for one `emit` call using the documented
585    /// precedence: explicit ctor arg → `TAUDIT_SCAN_RUN_ID` env var →
586    /// minted `Uuid::new_v4()`.
587    fn resolve_scan_run_id(&self) -> String {
588        self.scan_run_id
589            .clone()
590            .and_then(non_empty_env_value)
591            .or_else(|| {
592                std::env::var(SCAN_RUN_ID_ENV)
593                    .ok()
594                    .and_then(non_empty_env_value)
595            })
596            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
597    }
598}
599
600fn non_empty_env_value(value: String) -> Option<String> {
601    if value.trim().is_empty() {
602        None
603    } else {
604        Some(value)
605    }
606}
607
608impl<W: std::io::Write> ReportSink<W> for CloudEventsJsonlSink {
609    fn emit(
610        &self,
611        w: &mut W,
612        graph: &AuthorityGraph,
613        findings: &[Finding],
614    ) -> Result<(), TauditError> {
615        let correlation_id = self.resolve_correlation_id();
616        let scan_run_id = self.resolve_scan_run_id();
617        let pipeline_id = derive_pipeline_id(graph);
618
619        for finding in findings {
620            let event =
621                finding_to_event(finding, graph, &correlation_id, &pipeline_id, &scan_run_id);
622            serde_json::to_writer(&mut *w, &event)
623                .map_err(|e| TauditError::Report(format!("CloudEvents serialization: {e}")))?;
624            writeln!(w).map_err(|e| TauditError::Report(e.to_string()))?;
625        }
626
627        Ok(())
628    }
629}
630
631// ---------------------------------------------------------------------------
632// Tests
633// ---------------------------------------------------------------------------
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638    use std::{
639        fs,
640        path::PathBuf,
641        sync::{Mutex, OnceLock},
642    };
643    use taudit_core::finding::{FindingExtras, Recommendation, Severity};
644    use taudit_core::graph::{GapKind, PipelineSource};
645
646    fn test_source() -> PipelineSource {
647        PipelineSource {
648            file: ".github/workflows/ci.yml".into(),
649            repo: None,
650            git_ref: None,
651            commit_sha: None,
652        }
653    }
654
655    fn test_finding(category: FindingCategory, severity: Severity) -> Finding {
656        Finding {
657            severity,
658            category,
659            path: None,
660            nodes_involved: vec![0, 1],
661            message: "test finding".into(),
662            recommendation: Recommendation::Manual {
663                action: "fix it".into(),
664            },
665            source: taudit_core::finding::FindingSource::BuiltIn,
666            extras: FindingExtras::default(),
667        }
668    }
669
670    fn read_json(relative: &str) -> serde_json::Value {
671        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
672            .join("../..")
673            .join(relative);
674        let text = fs::read_to_string(&path)
675            .unwrap_or_else(|err| panic!("failed to read {}: {err}", path.display()));
676        serde_json::from_str(&text)
677            .unwrap_or_else(|err| panic!("failed to parse {}: {err}", path.display()))
678    }
679
680    fn env_lock() -> &'static Mutex<()> {
681        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
682        LOCK.get_or_init(|| Mutex::new(()))
683    }
684
685    fn env_guard() -> std::sync::MutexGuard<'static, ()> {
686        env_lock()
687            .lock()
688            .unwrap_or_else(|poisoned| poisoned.into_inner())
689    }
690
691    fn cleanup_correlation_env() {
692        unsafe {
693            std::env::remove_var(CORRELATION_ID_ENV);
694        }
695    }
696
697    fn cleanup_scan_run_env() {
698        unsafe {
699            std::env::remove_var(SCAN_RUN_ID_ENV);
700        }
701    }
702
703    #[test]
704    fn emits_one_jsonl_line_per_finding() {
705        let graph = AuthorityGraph::new(test_source());
706        let findings = vec![
707            test_finding(FindingCategory::UnpinnedAction, Severity::Medium),
708            test_finding(FindingCategory::AuthorityPropagation, Severity::Critical),
709        ];
710
711        let mut buf = Vec::new();
712        CloudEventsJsonlSink::default()
713            .emit(&mut buf, &graph, &findings)
714            .unwrap();
715
716        let output = String::from_utf8(buf).unwrap();
717        let lines: Vec<&str> = output.lines().collect();
718        assert_eq!(lines.len(), 2, "one JSONL line per finding");
719    }
720
721    #[test]
722    fn each_line_is_valid_cloudevent() {
723        let graph = AuthorityGraph::new(test_source());
724        let findings = vec![test_finding(
725            FindingCategory::OverPrivilegedIdentity,
726            Severity::High,
727        )];
728
729        let mut buf = Vec::new();
730        CloudEventsJsonlSink::default()
731            .emit(&mut buf, &graph, &findings)
732            .unwrap();
733
734        let output = String::from_utf8(buf).unwrap();
735        let event: serde_json::Value =
736            serde_json::from_str(output.lines().next().unwrap()).unwrap();
737
738        assert_eq!(event["specversion"], "1.0");
739        assert_eq!(event["source"], "taudit");
740        assert_eq!(event["type"], "io.taudit.finding.over_privileged_identity");
741        assert_eq!(event["subject"], ".github/workflows/ci.yml");
742        assert_eq!(event["datacontenttype"], "application/json");
743        assert!(event["id"].is_string());
744        assert!(event["time"].is_string());
745        assert!(event["data"].is_object());
746        assert_eq!(event["tauditcompleteness"], "complete");
747        assert!(event["correlationid"].is_string());
748        assert!(event["tauditpipelineid"].is_string());
749        assert!(event["tauditscanrunid"].is_string());
750        assert_eq!(event["provenancerepo"], "taudit");
751        assert_eq!(event["provenanceproducer"], "taudit-sink-cloudevents");
752        assert_eq!(event["provenancekind"], "finding");
753    }
754
755    #[test]
756    fn partial_graph_sets_completeness_extension() {
757        let mut graph = AuthorityGraph::new(test_source());
758        // Use `Structural` — it represents the more impactful failure class
759        // (unresolvable composite / reusable-workflow / extends / include),
760        // and exercises a different `GapKind` variant than the parser-side
761        // tests so this regression covers the full enum surface.
762        graph.mark_partial(
763            GapKind::Structural,
764            "composite action ref unresolved at scan time",
765        );
766        let findings = vec![test_finding(
767            FindingCategory::AuthorityPropagation,
768            Severity::Critical,
769        )];
770
771        let mut buf = Vec::new();
772        CloudEventsJsonlSink::default()
773            .emit(&mut buf, &graph, &findings)
774            .unwrap();
775
776        let output = String::from_utf8(buf).unwrap();
777        let event: serde_json::Value =
778            serde_json::from_str(output.lines().next().unwrap()).unwrap();
779
780        assert_eq!(event["tauditcompleteness"], "partial");
781
782        // The new structured `tauditcompletenessgaps` extension must surface
783        // the typed kind alongside the prose reason. Asserting both fields
784        // on entry [0] guarantees the parallel-vector zip stays aligned and
785        // the serde snake_case rename for `GapKind` is wired through.
786        let gaps = event["tauditcompletenessgaps"]
787            .as_array()
788            .expect("tauditcompletenessgaps must be an array on Partial graphs");
789        assert_eq!(gaps.len(), 1, "exactly one gap was recorded");
790        assert_eq!(
791            gaps[0]["kind"], "structural",
792            "GapKind::Structural must serialize as snake_case `structural`",
793        );
794        assert_eq!(
795            gaps[0]["reason"], "composite action ref unresolved at scan time",
796            "original reason string must be preserved verbatim",
797        );
798    }
799
800    #[test]
801    fn data_payload_contains_finding_fields() {
802        let graph = AuthorityGraph::new(test_source());
803        let findings = vec![test_finding(
804            FindingCategory::LongLivedCredential,
805            Severity::Low,
806        )];
807
808        let mut buf = Vec::new();
809        CloudEventsJsonlSink::default()
810            .emit(&mut buf, &graph, &findings)
811            .unwrap();
812
813        let output = String::from_utf8(buf).unwrap();
814        let event: serde_json::Value =
815            serde_json::from_str(output.lines().next().unwrap()).unwrap();
816        let data = &event["data"];
817
818        assert_eq!(data["severity"], "low");
819        assert_eq!(data["category"], "long_lived_credential");
820        assert_eq!(data["message"], "test finding");
821        assert!(data["recommendation"].is_object());
822    }
823
824    #[test]
825    fn event_type_maps_all_categories() {
826        let categories = vec![
827            (
828                FindingCategory::AuthorityPropagation,
829                "io.taudit.finding.authority_propagation",
830            ),
831            (
832                FindingCategory::OverPrivilegedIdentity,
833                "io.taudit.finding.over_privileged_identity",
834            ),
835            (
836                FindingCategory::UnpinnedAction,
837                "io.taudit.finding.unpinned_action",
838            ),
839            (
840                FindingCategory::UntrustedWithAuthority,
841                "io.taudit.finding.untrusted_with_authority",
842            ),
843            (
844                FindingCategory::ArtifactBoundaryCrossing,
845                "io.taudit.finding.artifact_boundary_crossing",
846            ),
847            (
848                FindingCategory::EgressBlindspot,
849                "io.taudit.finding.egress_blindspot",
850            ),
851            (
852                FindingCategory::MissingAuditTrail,
853                "io.taudit.finding.missing_audit_trail",
854            ),
855            (
856                FindingCategory::FloatingImage,
857                "io.taudit.finding.floating_image",
858            ),
859            (
860                FindingCategory::LongLivedCredential,
861                "io.taudit.finding.long_lived_credential",
862            ),
863        ];
864
865        for (cat, expected) in categories {
866            assert_eq!(event_type(cat), expected);
867        }
868    }
869
870    #[test]
871    fn emitted_event_matches_cloudevent_schema() {
872        let graph = AuthorityGraph::new(test_source());
873        let findings = vec![test_finding(
874            FindingCategory::AuthorityPropagation,
875            Severity::Critical,
876        )];
877
878        let mut buf = Vec::new();
879        CloudEventsJsonlSink::default()
880            .emit(&mut buf, &graph, &findings)
881            .unwrap();
882
883        let output = String::from_utf8(buf).unwrap();
884        let event = serde_json::from_str(output.lines().next().unwrap()).unwrap();
885        let schema = read_json("contracts/schemas/taudit-cloudevent-finding-v1.schema.json");
886        let validator =
887            jsonschema::validator_for(&schema).expect("cloudevent schema should compile");
888        let errors: Vec<String> = validator
889            .iter_errors(&event)
890            .map(|err| err.to_string())
891            .collect();
892
893        assert!(
894            errors.is_empty(),
895            "emitted event does not match CloudEvent schema:\n{}",
896            errors.join("\n")
897        );
898    }
899
900    #[test]
901    fn checked_in_example_matches_cloudevent_schema() {
902        let event = read_json("contracts/examples/over-privileged-finding.cloudevent.json");
903        let schema = read_json("contracts/schemas/taudit-cloudevent-finding-v1.schema.json");
904        let validator =
905            jsonschema::validator_for(&schema).expect("cloudevent schema should compile");
906        let errors: Vec<String> = validator
907            .iter_errors(&event)
908            .map(|err| err.to_string())
909            .collect();
910
911        assert!(
912            errors.is_empty(),
913            "checked-in CloudEvent example does not match schema:\n{}",
914            errors.join("\n")
915        );
916    }
917
918    #[test]
919    fn emitted_event_matches_shared_envelope_schema() {
920        let graph = AuthorityGraph::new(test_source());
921        let findings = vec![test_finding(
922            FindingCategory::AuthorityPropagation,
923            Severity::Critical,
924        )];
925
926        let mut buf = Vec::new();
927        CloudEventsJsonlSink::default()
928            .emit(&mut buf, &graph, &findings)
929            .unwrap();
930
931        let output = String::from_utf8(buf).unwrap();
932        let event = serde_json::from_str(output.lines().next().unwrap()).unwrap();
933        let schema = read_json("contracts/schemas/ecosystem-evidence-envelope-v0.schema.json");
934        let validator =
935            jsonschema::validator_for(&schema).expect("shared envelope schema should compile");
936        let errors: Vec<String> = validator
937            .iter_errors(&event)
938            .map(|err| err.to_string())
939            .collect();
940
941        assert!(
942            errors.is_empty(),
943            "emitted event does not match shared envelope schema:\n{}",
944            errors.join("\n")
945        );
946    }
947
948    #[test]
949    fn checked_in_example_matches_shared_envelope_schema() {
950        let event = read_json("contracts/examples/over-privileged-finding.cloudevent.json");
951        let schema = read_json("contracts/schemas/ecosystem-evidence-envelope-v0.schema.json");
952        let validator =
953            jsonschema::validator_for(&schema).expect("shared envelope schema should compile");
954        let errors: Vec<String> = validator
955            .iter_errors(&event)
956            .map(|err| err.to_string())
957            .collect();
958
959        assert!(
960            errors.is_empty(),
961            "checked-in CloudEvent example does not match shared envelope schema:\n{}",
962            errors.join("\n")
963        );
964    }
965
966    #[test]
967    fn shared_envelope_example_matches_shared_envelope_schema() {
968        let event = read_json("contracts/examples/ecosystem-evidence-envelope.example.json");
969        let schema = read_json("contracts/schemas/ecosystem-evidence-envelope-v0.schema.json");
970        let validator =
971            jsonschema::validator_for(&schema).expect("shared envelope schema should compile");
972        let errors: Vec<String> = validator
973            .iter_errors(&event)
974            .map(|err| err.to_string())
975            .collect();
976
977        assert!(
978            errors.is_empty(),
979            "checked-in shared envelope example does not match shared envelope schema:\n{}",
980            errors.join("\n")
981        );
982    }
983
984    #[test]
985    fn findings_from_same_emit_share_correlation_id() {
986        let graph = AuthorityGraph::new(test_source());
987        let findings = vec![
988            test_finding(FindingCategory::UnpinnedAction, Severity::Medium),
989            test_finding(FindingCategory::AuthorityPropagation, Severity::Critical),
990        ];
991
992        let mut buf = Vec::new();
993        CloudEventsJsonlSink::default()
994            .emit(&mut buf, &graph, &findings)
995            .unwrap();
996
997        let output = String::from_utf8(buf).unwrap();
998        let correlation_ids: Vec<String> = output
999            .lines()
1000            .map(|line| {
1001                let event: serde_json::Value = serde_json::from_str(line).unwrap();
1002                event["correlationid"].as_str().unwrap().to_string()
1003            })
1004            .collect();
1005
1006        assert_eq!(correlation_ids.len(), 2);
1007        assert_eq!(correlation_ids[0], correlation_ids[1]);
1008    }
1009
1010    #[test]
1011    fn findings_from_same_emit_share_scan_run_id() {
1012        let graph = AuthorityGraph::new(test_source());
1013        let findings = vec![
1014            test_finding(FindingCategory::UnpinnedAction, Severity::Medium),
1015            test_finding(FindingCategory::AuthorityPropagation, Severity::Critical),
1016        ];
1017        let mut buf = Vec::new();
1018        CloudEventsJsonlSink::default()
1019            .emit(&mut buf, &graph, &findings)
1020            .unwrap();
1021
1022        let output = String::from_utf8(buf).unwrap();
1023        let scan_run_ids: Vec<String> = output
1024            .lines()
1025            .map(|line| {
1026                let event: serde_json::Value = serde_json::from_str(line).unwrap();
1027                event["tauditscanrunid"].as_str().unwrap().to_string()
1028            })
1029            .collect();
1030
1031        assert_eq!(scan_run_ids.len(), 2);
1032        assert_eq!(scan_run_ids[0], scan_run_ids[1]);
1033    }
1034
1035    #[test]
1036    fn correlation_id_uses_non_empty_env_value_when_set() {
1037        let _guard = env_guard();
1038        cleanup_correlation_env();
1039        unsafe {
1040            std::env::set_var(CORRELATION_ID_ENV, "corr-from-env");
1041        }
1042
1043        let sink = CloudEventsJsonlSink::default();
1044        assert_eq!(sink.resolve_correlation_id(), "corr-from-env");
1045
1046        cleanup_correlation_env();
1047    }
1048
1049    #[test]
1050    fn correlation_id_empty_env_value_falls_back_to_uuid() {
1051        let _guard = env_guard();
1052        cleanup_correlation_env();
1053        unsafe {
1054            std::env::set_var(CORRELATION_ID_ENV, "   ");
1055        }
1056
1057        let sink = CloudEventsJsonlSink::default();
1058        let resolved = sink.resolve_correlation_id();
1059        assert!(
1060            uuid::Uuid::parse_str(&resolved).is_ok(),
1061            "empty env must fall back to a minted UUID"
1062        );
1063
1064        cleanup_correlation_env();
1065    }
1066
1067    #[test]
1068    fn correlation_id_unset_env_falls_back_to_uuid() {
1069        let _guard = env_guard();
1070        cleanup_correlation_env();
1071
1072        let sink = CloudEventsJsonlSink::default();
1073        let resolved = sink.resolve_correlation_id();
1074        assert!(
1075            uuid::Uuid::parse_str(&resolved).is_ok(),
1076            "unset env must fall back to a minted UUID"
1077        );
1078    }
1079
1080    #[test]
1081    fn explicit_empty_correlation_id_falls_back_to_env() {
1082        let _guard = env_guard();
1083        cleanup_correlation_env();
1084        unsafe {
1085            std::env::set_var(CORRELATION_ID_ENV, "corr-from-env");
1086        }
1087
1088        let sink = CloudEventsJsonlSink::with_ids(Some("   ".into()), None);
1089        assert_eq!(sink.resolve_correlation_id(), "corr-from-env");
1090
1091        cleanup_correlation_env();
1092    }
1093
1094    #[test]
1095    fn scan_run_id_uses_non_empty_env_value_when_set() {
1096        let _guard = env_guard();
1097        cleanup_scan_run_env();
1098        unsafe {
1099            std::env::set_var(SCAN_RUN_ID_ENV, "scan-run-from-env");
1100        }
1101
1102        let sink = CloudEventsJsonlSink::default();
1103        assert_eq!(sink.resolve_scan_run_id(), "scan-run-from-env");
1104
1105        cleanup_scan_run_env();
1106    }
1107
1108    #[test]
1109    fn scan_run_id_empty_env_value_falls_back_to_uuid() {
1110        let _guard = env_guard();
1111        cleanup_scan_run_env();
1112        unsafe {
1113            std::env::set_var(SCAN_RUN_ID_ENV, "   ");
1114        }
1115
1116        let sink = CloudEventsJsonlSink::default();
1117        let resolved = sink.resolve_scan_run_id();
1118        assert!(
1119            uuid::Uuid::parse_str(&resolved).is_ok(),
1120            "empty env must fall back to a minted UUID"
1121        );
1122
1123        cleanup_scan_run_env();
1124    }
1125
1126    #[test]
1127    fn scan_run_id_unset_env_falls_back_to_uuid() {
1128        let _guard = env_guard();
1129        cleanup_scan_run_env();
1130
1131        let sink = CloudEventsJsonlSink::default();
1132        let resolved = sink.resolve_scan_run_id();
1133        assert!(
1134            uuid::Uuid::parse_str(&resolved).is_ok(),
1135            "unset env must fall back to a minted UUID"
1136        );
1137    }
1138
1139    #[test]
1140    fn explicit_empty_scan_run_id_falls_back_to_env() {
1141        let _guard = env_guard();
1142        cleanup_scan_run_env();
1143        unsafe {
1144            std::env::set_var(SCAN_RUN_ID_ENV, "scan-run-from-env");
1145        }
1146
1147        let sink = CloudEventsJsonlSink::with_ids(None, Some("   ".into()));
1148        assert_eq!(sink.resolve_scan_run_id(), "scan-run-from-env");
1149
1150        cleanup_scan_run_env();
1151    }
1152
1153    #[test]
1154    fn pipeline_id_uses_metadata_pipeline_content_hash_when_present() {
1155        let mut graph = AuthorityGraph::new(test_source());
1156        graph.metadata.insert(
1157            "pipeline_content_hash".into(),
1158            "sha256:0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".into(),
1159        );
1160
1161        let findings = vec![test_finding(
1162            FindingCategory::AuthorityPropagation,
1163            Severity::High,
1164        )];
1165        let mut buf = Vec::new();
1166        CloudEventsJsonlSink::default()
1167            .emit(&mut buf, &graph, &findings)
1168            .unwrap();
1169
1170        let event: serde_json::Value =
1171            serde_json::from_str(std::str::from_utf8(&buf).unwrap().lines().next().unwrap())
1172                .unwrap();
1173        assert_eq!(
1174            event["tauditpipelineid"],
1175            "urn:taudit:pipeline:sha256:0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
1176        );
1177    }
1178
1179    #[test]
1180    fn pipeline_id_is_stable_across_emits_for_same_graph() {
1181        let graph = AuthorityGraph::new(test_source());
1182        let findings = vec![test_finding(
1183            FindingCategory::AuthorityPropagation,
1184            Severity::High,
1185        )];
1186
1187        let sink = CloudEventsJsonlSink::default();
1188        let mut buf_a = Vec::new();
1189        let mut buf_b = Vec::new();
1190        sink.emit(&mut buf_a, &graph, &findings).unwrap();
1191        sink.emit(&mut buf_b, &graph, &findings).unwrap();
1192
1193        let event_a: serde_json::Value =
1194            serde_json::from_str(std::str::from_utf8(&buf_a).unwrap().lines().next().unwrap())
1195                .unwrap();
1196        let event_b: serde_json::Value =
1197            serde_json::from_str(std::str::from_utf8(&buf_b).unwrap().lines().next().unwrap())
1198                .unwrap();
1199
1200        assert_eq!(event_a["tauditpipelineid"], event_b["tauditpipelineid"]);
1201    }
1202
1203    #[test]
1204    fn empty_findings_produces_empty_output() {
1205        let graph = AuthorityGraph::new(test_source());
1206
1207        let mut buf = Vec::new();
1208        CloudEventsJsonlSink::default()
1209            .emit(&mut buf, &graph, &[])
1210            .unwrap();
1211
1212        assert!(buf.is_empty(), "no findings = no output");
1213    }
1214
1215    #[test]
1216    fn platform_metadata_surfaces_as_extension_attribute() {
1217        // taudit-cli stamps `graph.metadata["platform"]` to the canonical
1218        // short token after resolving the parser. The sink should mirror it
1219        // verbatim onto the CloudEvent envelope so SIEMs can route by
1220        // platform without re-parsing the subject.
1221        for token in &["ado", "gha", "gitlab"] {
1222            let mut graph = AuthorityGraph::new(test_source());
1223            graph
1224                .metadata
1225                .insert("platform".to_string(), (*token).to_string());
1226            let findings = vec![test_finding(
1227                FindingCategory::AuthorityPropagation,
1228                Severity::High,
1229            )];
1230
1231            let mut buf = Vec::new();
1232            CloudEventsJsonlSink::default()
1233                .emit(&mut buf, &graph, &findings)
1234                .unwrap();
1235
1236            let event: serde_json::Value =
1237                serde_json::from_str(std::str::from_utf8(&buf).unwrap().lines().next().unwrap())
1238                    .unwrap();
1239            assert_eq!(
1240                event["tauditplatform"], *token,
1241                "platform metadata must surface verbatim on the envelope"
1242            );
1243        }
1244    }
1245
1246    #[test]
1247    fn missing_platform_metadata_omits_extension_attribute() {
1248        // Backward-compat: events for graphs that lack the metadata key
1249        // simply omit the attribute. SIEM consumers see absence, not "null".
1250        let graph = AuthorityGraph::new(test_source());
1251        assert!(!graph.metadata.contains_key("platform"));
1252        let findings = vec![test_finding(
1253            FindingCategory::AuthorityPropagation,
1254            Severity::High,
1255        )];
1256
1257        let mut buf = Vec::new();
1258        CloudEventsJsonlSink::default()
1259            .emit(&mut buf, &graph, &findings)
1260            .unwrap();
1261
1262        let event: serde_json::Value =
1263            serde_json::from_str(std::str::from_utf8(&buf).unwrap().lines().next().unwrap())
1264                .unwrap();
1265        assert!(
1266            event.get("tauditplatform").is_none(),
1267            "absent metadata must not emit the attribute (not even as null)"
1268        );
1269    }
1270
1271    #[test]
1272    fn unrecognised_platform_value_is_dropped() {
1273        // Defence-in-depth: a metadata key written by some future code path
1274        // with a non-canonical value should not leak through. SIEM rules
1275        // pattern-match on the closed enum {ado,gha,gitlab}.
1276        let mut graph = AuthorityGraph::new(test_source());
1277        graph
1278            .metadata
1279            .insert("platform".to_string(), "circleci".to_string());
1280        let findings = vec![test_finding(
1281            FindingCategory::AuthorityPropagation,
1282            Severity::High,
1283        )];
1284
1285        let mut buf = Vec::new();
1286        CloudEventsJsonlSink::default()
1287            .emit(&mut buf, &graph, &findings)
1288            .unwrap();
1289
1290        let event: serde_json::Value =
1291            serde_json::from_str(std::str::from_utf8(&buf).unwrap().lines().next().unwrap())
1292                .unwrap();
1293        assert!(
1294            event.get("tauditplatform").is_none(),
1295            "unrecognised platform tokens must be dropped, not surfaced"
1296        );
1297    }
1298
1299    /// Mirror of `taudit-report-json::tests::json_output_is_byte_deterministic_across_runs`.
1300    /// CloudEvents intentionally minted a fresh `id` (UUID v4) and `time` per
1301    /// event, so the envelope is non-deterministic by design on those two
1302    /// keys. Everything else — `tauditfindingfingerprint`, `tauditfindinggroup`,
1303    /// `tauditruleid`, `data`, `subject`, `tauditcompleteness`, etc — must be
1304    /// stable across re-runs of the same scan, mirroring the JSON contract.
1305    /// Strip `id` and `time` after parsing each event, then assert the
1306    /// remaining JSON is byte-equal across 9 runs.
1307    #[test]
1308    fn cloudevents_stable_bits_are_deterministic_across_runs() {
1309        use std::collections::HashMap;
1310        use taudit_core::graph::{EdgeKind, NodeKind, TrustZone};
1311
1312        fn build_graph() -> (AuthorityGraph, Vec<Finding>) {
1313            let mut graph = AuthorityGraph::new(PipelineSource {
1314                file: "ci.yml".into(),
1315                repo: None,
1316                git_ref: None,
1317                commit_sha: None,
1318            });
1319            let secret_a = graph.add_node(NodeKind::Secret, "AWS_KEY", TrustZone::FirstParty);
1320            let secret_b = graph.add_node(NodeKind::Secret, "DEPLOY_TOKEN", TrustZone::FirstParty);
1321            let step = graph.add_node(NodeKind::Step, "deploy", TrustZone::FirstParty);
1322            graph.add_edge(step, secret_a, EdgeKind::HasAccessTo);
1323            graph.add_edge(step, secret_b, EdgeKind::HasAccessTo);
1324            if let Some(node) = graph.nodes.get_mut(step) {
1325                let mut meta: HashMap<String, String> = HashMap::new();
1326                meta.insert("z_field".into(), "z".into());
1327                meta.insert("a_field".into(), "a".into());
1328                meta.insert("m_field".into(), "m".into());
1329                meta.insert("k_field".into(), "k".into());
1330                meta.insert("c_field".into(), "c".into());
1331                node.metadata = meta;
1332            }
1333            graph
1334                .metadata
1335                .insert("trigger".into(), "pull_request".into());
1336            graph.metadata.insert("platform".into(), "gha".into());
1337            let findings = vec![Finding {
1338                severity: Severity::High,
1339                category: FindingCategory::AuthorityPropagation,
1340                path: None,
1341                nodes_involved: vec![secret_a, step],
1342                message: "AWS_KEY reaches deploy".into(),
1343                recommendation: Recommendation::Manual {
1344                    action: "scope it".into(),
1345                },
1346                source: taudit_core::finding::FindingSource::BuiltIn,
1347                extras: FindingExtras::default(),
1348            }];
1349            (graph, findings)
1350        }
1351
1352        // Pin the correlation id so the only intentionally non-deterministic
1353        // bits left are `id` (UUID v4 per event) and `time` (RFC3339 now()).
1354        let sink = CloudEventsJsonlSink::with_correlation_id(Some("det-test-correlation".into()));
1355
1356        fn emit_and_strip(sink: &CloudEventsJsonlSink) -> Vec<u8> {
1357            let (g, f) = build_graph();
1358            let mut buf = Vec::new();
1359            sink.emit(&mut buf, &g, &f).unwrap();
1360            // One JSONL line, parse → drop `id`/`time` → re-serialise canonically.
1361            let line = std::str::from_utf8(&buf).unwrap().lines().next().unwrap();
1362            let mut v: serde_json::Value = serde_json::from_str(line).unwrap();
1363            if let Some(obj) = v.as_object_mut() {
1364                obj.remove("id");
1365                obj.remove("time");
1366                obj.remove("tauditscanrunid");
1367            }
1368            serde_json::to_vec(&v).unwrap()
1369        }
1370
1371        let mut runs: Vec<Vec<u8>> = Vec::with_capacity(9);
1372        for _ in 0..9 {
1373            runs.push(emit_and_strip(&sink));
1374        }
1375
1376        let first = &runs[0];
1377        for (i, run) in runs.iter().enumerate().skip(1) {
1378            assert_eq!(
1379                first, run,
1380                "run 0 and run {i} produced byte-different stable CloudEvent bits (non-determinism regression)"
1381            );
1382        }
1383    }
1384
1385    #[test]
1386    fn rule_id_extension_matches_canonical_helper() {
1387        // Built-in finding: tauditruleid mirrors `rule_id_for` (snake_case
1388        // category). Custom-rule finding: the `[id]` message prefix wins.
1389        // This is the per-sink half of the cross-sink equality contract
1390        // covered end-to-end by `cross_sink_contract.rs`.
1391        let graph = AuthorityGraph::new(test_source());
1392        let mut custom = test_finding(FindingCategory::AuthorityPropagation, Severity::Critical);
1393        custom.message = "[my_custom_rule] some prose".into();
1394        custom.source = taudit_core::finding::FindingSource::Custom {
1395            source_file: std::path::PathBuf::from("rules/my_custom_rule.yaml"),
1396        };
1397        let findings = vec![
1398            test_finding(FindingCategory::OverPrivilegedIdentity, Severity::High),
1399            custom,
1400        ];
1401
1402        let mut buf = Vec::new();
1403        CloudEventsJsonlSink::default()
1404            .emit(&mut buf, &graph, &findings)
1405            .unwrap();
1406
1407        let lines: Vec<&str> = std::str::from_utf8(&buf).unwrap().lines().collect();
1408        let v0: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
1409        let v1: serde_json::Value = serde_json::from_str(lines[1]).unwrap();
1410
1411        assert_eq!(v0["tauditruleid"], "over_privileged_identity");
1412        assert_eq!(v1["tauditruleid"], "my_custom_rule");
1413        // `type` field stays scoped to the FindingCategory — the custom-rule
1414        // finding's type is still `authority_propagation` for routing
1415        // stability, while the new extension surfaces the rule-level id.
1416        assert_eq!(v1["type"], "io.taudit.finding.authority_propagation");
1417    }
1418
1419    #[test]
1420    fn unique_ids_per_event() {
1421        let graph = AuthorityGraph::new(test_source());
1422        let findings = vec![
1423            test_finding(FindingCategory::UnpinnedAction, Severity::Medium),
1424            test_finding(FindingCategory::UnpinnedAction, Severity::Medium),
1425        ];
1426
1427        let mut buf = Vec::new();
1428        CloudEventsJsonlSink::default()
1429            .emit(&mut buf, &graph, &findings)
1430            .unwrap();
1431
1432        let output = String::from_utf8(buf).unwrap();
1433        let ids: Vec<String> = output
1434            .lines()
1435            .map(|l| {
1436                let v: serde_json::Value = serde_json::from_str(l).unwrap();
1437                v["id"].as_str().unwrap().to_string()
1438            })
1439            .collect();
1440
1441        assert_ne!(ids[0], ids[1], "each event must have a unique ID");
1442    }
1443
1444    /// Parallel to `taudit-report-json`'s
1445    /// `every_finding_category_variant_validates_against_report_schema`.
1446    /// The pre-fix CloudEvent schema listed only 10 of the 63
1447    /// `FindingCategory` variants in `data.properties.category.enum`. A
1448    /// poisoned MR pipeline whose finding fired one of the missing 53
1449    /// produced a byte-valid event that strict-validating SIEMs rejected.
1450    /// Enumerate every variant, emit through the JSONL sink, and validate
1451    /// against the published CloudEvent schema.
1452    #[test]
1453    fn every_finding_category_variant_validates_against_cloudevent_schema() {
1454        use taudit_core::finding::FindingCategory as C;
1455
1456        let all: Vec<C> = vec![
1457            C::AuthorityPropagation,
1458            C::OverPrivilegedIdentity,
1459            C::UnpinnedAction,
1460            C::UntrustedWithAuthority,
1461            C::ArtifactBoundaryCrossing,
1462            C::FloatingImage,
1463            C::LongLivedCredential,
1464            C::PersistedCredential,
1465            C::TriggerContextMismatch,
1466            C::CrossWorkflowAuthorityChain,
1467            C::AuthorityCycle,
1468            C::UpliftWithoutAttestation,
1469            C::SelfMutatingPipeline,
1470            C::CheckoutSelfPrExposure,
1471            C::VariableGroupInPrJob,
1472            C::SelfHostedPoolPrHijack,
1473            C::SharedSelfHostedPoolNoIsolation,
1474            C::ServiceConnectionScopeMismatch,
1475            C::TemplateExtendsUnpinnedBranch,
1476            C::TemplateRepoRefIsFeatureBranch,
1477            C::VmRemoteExecViaPipelineSecret,
1478            C::ShortLivedSasInCommandLine,
1479            C::SecretToInlineScriptEnvExport,
1480            C::SecretMaterialisedToWorkspaceFile,
1481            C::KeyVaultSecretToPlaintext,
1482            C::TerraformAutoApproveInProd,
1483            C::AddSpnWithInlineScript,
1484            C::ParameterInterpolationIntoShell,
1485            C::RuntimeScriptFetchedFromFloatingUrl,
1486            C::PrTriggerWithFloatingActionRef,
1487            C::UntrustedApiResponseToEnvSink,
1488            C::PrBuildPushesImageWithFloatingCredentials,
1489            C::SecretViaEnvGateToUntrustedConsumer,
1490            C::NoWorkflowLevelPermissionsBlock,
1491            C::ProdDeployJobNoEnvironmentGate,
1492            C::LongLivedSecretWithoutOidcRecommendation,
1493            C::PullRequestWorkflowInconsistentForkCheck,
1494            C::GitlabDeployJobMissingProtectedBranchOnly,
1495            C::TerraformOutputViaSetvariableShellExpansion,
1496            C::RiskyTriggerWithAuthority,
1497            C::SensitiveValueInJobOutput,
1498            C::ManualDispatchInputToUrlOrCommand,
1499            C::SecretsInheritOverscopedPassthrough,
1500            C::UnsafePrArtifactInWorkflowRunConsumer,
1501            C::ScriptInjectionViaUntrustedContext,
1502            C::InteractiveDebugActionInAuthorityWorkflow,
1503            C::PrSpecificCacheKeyInDefaultBranchConsumer,
1504            C::GhCliWithDefaultTokenEscalating,
1505            C::GhaScriptInjectionToPrivilegedShell,
1506            C::GhaWorkflowRunArtifactPoisoningToPrivilegedConsumer,
1507            C::GhaRemoteScriptInAuthorityJob,
1508            C::GhaPatRemoteUrlWrite,
1509            C::GhaIssueCommentCommandToWriteToken,
1510            C::GhaPrBuildPushesPublishableImage,
1511            C::GhaManualDispatchRefToPrivilegedCheckout,
1512            C::CiJobTokenToExternalApi,
1513            C::IdTokenAudienceOverscoped,
1514            C::UntrustedCiVarInShellInterpolation,
1515            C::UnpinnedIncludeRemoteOrBranchRef,
1516            C::DindServiceGrantsHostAuthority,
1517            C::SecurityJobSilentlySkipped,
1518            C::ChildPipelineTriggerInheritsAuthority,
1519            C::CacheKeyCrossesTrustBoundary,
1520            C::PatEmbeddedInGitRemoteUrl,
1521            C::CiTokenTriggersDownstreamWithVariablePassthrough,
1522            C::DotenvArtifactFlowsToPrivilegedDeployment,
1523            C::SetvariableIssecretFalse,
1524            C::HomoglyphInActionRef,
1525            C::GhaHelperPathSensitiveArgv,
1526            C::GhaHelperPathSensitiveStdin,
1527            C::GhaHelperPathSensitiveEnv,
1528            C::GhaPostAmbientEnvCleanupPath,
1529            C::GhaActionMintedSecretToHelper,
1530            C::GhaHelperUntrustedPathResolution,
1531            C::GhaSecretOutputAfterHelperLogin,
1532            C::LaterSecretMaterializedAfterPathMutation,
1533            C::GhaSetupNodeCacheHelperPathHandoff,
1534            C::GhaSetupPythonCacheHelperPathHandoff,
1535            C::GhaSetupPythonPipInstallAuthorityEnv,
1536            C::GhaSetupGoCacheHelperPathHandoff,
1537            C::GhaDockerSetupQemuPrivilegedDockerHelper,
1538            C::GhaToolInstallerThenShellHelperAuthority,
1539            C::GhaWorkflowShellAuthorityConcentration,
1540            C::GhaActionTokenEnvBeforeBareDownloadHelper,
1541            C::GhaPostActionInputRetargetToCacheSave,
1542            C::GhaTerraformWrapperSensitiveOutput,
1543            C::GhaCompositeBareHelperAfterPathInstallWithSecretEnv,
1544            C::GhaPulumiPathResolvedCliWithAuthority,
1545            C::GhaPypiPublishOidcAfterPathMutation,
1546            C::GhaChangesetsPublishCommandWithAuthority,
1547            C::GhaRubygemsReleaseGitTokenAndOidcHelper,
1548            C::GhaCompositeEntrypointPathShadowWithSecretEnv,
1549            C::GhaDockerBuildxAuthorityPathHandoff,
1550            C::GhaGoogleDeployGcloudCredentialPath,
1551            C::GhaDatadogTestVisibilityInstallerAuthority,
1552            C::GhaKubernetesHelperKubeconfigAuthority,
1553            C::GhaAzureCompanionHelperAuthority,
1554            C::GhaCreatePrGitTokenPathHandoff,
1555            C::GhaImportGpgPrivateKeyHelperPath,
1556            C::GhaSshAgentPrivateKeyToPathHelper,
1557            C::GhaMacosCodesignCertSecurityPath,
1558            C::GhaPagesDeployTokenUrlToGitHelper,
1559            C::GhaManifestNpmLifecycleHookPrTriggerWithToken,
1560            C::GhaManifestPythonMBuildWithPrCredentials,
1561            C::GhaManifestCargoBuildRsPullRequestWithToken,
1562            C::GhaManifestMakefileWithPrTriggerAndSecrets,
1563            C::GhaManifestSubmodulesRecursiveWithPrAuthority,
1564            C::GhaCrossrepoWorkflowCallFloatingRefCascade,
1565            C::GhaCrossrepoSecretsInheritUnreviewedCallee,
1566            C::GhaToolcacheAbsolutePathDowngrade,
1567            C::EgressBlindspot,
1568            C::MissingAuditTrail,
1569        ];
1570
1571        assert_eq!(
1572            all.len(),
1573            112,
1574            "FindingCategory enumeration is out of sync with the schema generator (expected 112, got {})",
1575            all.len()
1576        );
1577
1578        let schema = read_json("contracts/schemas/taudit-cloudevent-finding-v1.schema.json");
1579        let validator =
1580            jsonschema::validator_for(&schema).expect("cloudevent schema should compile");
1581
1582        for category in all {
1583            let graph = AuthorityGraph::new(test_source());
1584            let findings = vec![test_finding(category, Severity::Medium)];
1585
1586            let mut buf = Vec::new();
1587            CloudEventsJsonlSink::default()
1588                .emit(&mut buf, &graph, &findings)
1589                .expect("sink emits");
1590            let output = String::from_utf8(buf).expect("output is UTF-8");
1591            let event: serde_json::Value = serde_json::from_str(output.lines().next().unwrap())
1592                .expect("emitted line is valid JSON");
1593            let errors: Vec<String> = validator
1594                .iter_errors(&event)
1595                .map(|err| err.to_string())
1596                .collect();
1597            assert!(
1598                errors.is_empty(),
1599                "category {category:?} produced an event that fails the published CloudEvent schema:\n{}",
1600                errors.join("\n")
1601            );
1602        }
1603    }
1604}