use std::borrow::Cow;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use url::Url;
use async_trait::async_trait;
use miette::{Diagnostic, NamedSource, SourceSpan};
use reqwest::StatusCode;
use thiserror::Error;
use crate::commands::test::labeler::identity::IdentityFacts;
use crate::commands::test::labeler::report::{CheckResult, CheckStatus, Stage};
use crate::common::diagnostics::pretty_json_for_display;
use crate::common::identity::{Did, is_local_labeler_hostname};
pub mod did_doc_server;
pub mod pollution;
pub mod self_mint;
pub mod sentinel;
#[derive(Debug)]
pub struct RawCreateReportResponse {
pub status: StatusCode,
pub content_type: Option<String>,
pub raw_body: Arc<[u8]>,
pub source_url: String,
}
#[derive(Debug, Error, Diagnostic)]
pub enum CreateReportStageError {
#[error("createReport transport error: {source}")]
#[diagnostic(code = "labeler::report::transport_error")]
Transport {
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
}
#[async_trait]
pub trait CreateReportTee: Send + Sync {
async fn post_create_report(
&self,
auth: Option<&str>,
body: &serde_json::Value,
) -> Result<RawCreateReportResponse, CreateReportStageError>;
}
#[derive(Debug)]
pub struct RawPdsXrpcResponse {
pub status: StatusCode,
pub raw_body: Arc<[u8]>,
pub content_type: Option<String>,
pub source_url: String,
}
#[async_trait]
pub trait PdsXrpcClient: Send + Sync {
async fn post(
&self,
path: &str,
bearer: Option<&str>,
atproto_proxy: Option<&str>,
body: &serde_json::Value,
) -> Result<RawPdsXrpcResponse, CreateReportStageError>;
async fn get(
&self,
path: &str,
bearer: Option<&str>,
query: &[(&str, &str)],
) -> Result<RawPdsXrpcResponse, CreateReportStageError>;
}
pub struct RealPdsXrpcClient {
client: reqwest::Client,
base: Url,
}
impl RealPdsXrpcClient {
pub fn new(client: reqwest::Client, base: Url) -> Self {
Self { client, base }
}
}
#[async_trait]
impl PdsXrpcClient for RealPdsXrpcClient {
async fn post(
&self,
path: &str,
bearer: Option<&str>,
atproto_proxy: Option<&str>,
body: &serde_json::Value,
) -> Result<RawPdsXrpcResponse, CreateReportStageError> {
let mut url = self.base.clone();
url.set_path(path);
let source_url = url.to_string();
let mut req = self
.client
.post(url.as_str())
.header("Content-Type", "application/json")
.body(serde_json::to_vec(body).expect("serde_json::Value always serializes"));
if let Some(b) = bearer {
req = req.header("Authorization", format!("Bearer {b}"));
}
if let Some(p) = atproto_proxy {
req = req.header("atproto-proxy", p);
}
let resp = req
.send()
.await
.map_err(|e| CreateReportStageError::Transport {
source: Box::new(e),
})?;
let status = resp.status();
let content_type = resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_ascii_lowercase());
let body = resp
.bytes()
.await
.map_err(|e| CreateReportStageError::Transport {
source: Box::new(e),
})?;
Ok(RawPdsXrpcResponse {
status,
raw_body: Arc::from(body.as_ref()),
content_type,
source_url,
})
}
async fn get(
&self,
path: &str,
bearer: Option<&str>,
query: &[(&str, &str)],
) -> Result<RawPdsXrpcResponse, CreateReportStageError> {
let mut url = self.base.clone();
url.set_path(path);
{
let mut pairs = url.query_pairs_mut();
for (k, v) in query {
pairs.append_pair(k, v);
}
}
let source_url = url.to_string();
let mut req = self.client.get(url.as_str());
if let Some(b) = bearer {
req = req.header("Authorization", format!("Bearer {b}"));
}
let resp = req
.send()
.await
.map_err(|e| CreateReportStageError::Transport {
source: Box::new(e),
})?;
let status = resp.status();
let content_type = resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_ascii_lowercase());
let body = resp
.bytes()
.await
.map_err(|e| CreateReportStageError::Transport {
source: Box::new(e),
})?;
Ok(RawPdsXrpcResponse {
status,
raw_body: Arc::from(body.as_ref()),
content_type,
source_url,
})
}
}
pub struct RealCreateReportTee {
client: reqwest::Client,
endpoint: Url,
}
impl RealCreateReportTee {
pub fn new(client: reqwest::Client, endpoint: Url) -> Self {
Self { client, endpoint }
}
}
#[async_trait]
impl CreateReportTee for RealCreateReportTee {
async fn post_create_report(
&self,
auth: Option<&str>,
body: &serde_json::Value,
) -> Result<RawCreateReportResponse, CreateReportStageError> {
let mut url = self.endpoint.clone();
url.set_path("xrpc/com.atproto.moderation.createReport");
let source_url = url.to_string();
tracing::debug!(
url = %source_url,
auth_kind = match auth {
None => "none",
Some(t) if !t.starts_with("ey") => "malformed",
Some(_) => "jwt",
},
"report stage: issuing createReport POST"
);
let mut req = self
.client
.post(url.as_str())
.header("Content-Type", "application/json")
.body(serde_json::to_vec(body).expect("serde_json::Value always serializes"));
if let Some(token) = auth {
req = req.header("Authorization", format!("Bearer {token}"));
}
let response = req
.send()
.await
.map_err(|e| CreateReportStageError::Transport {
source: Box::new(e),
})?;
let status = response.status();
let content_type = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_ascii_lowercase());
let body_bytes = response
.bytes()
.await
.map_err(|e| CreateReportStageError::Transport {
source: Box::new(e),
})?;
tracing::debug!(
url = %source_url,
status = %status,
body_len = body_bytes.len(),
"report stage: createReport response received"
);
Ok(RawCreateReportResponse {
status,
content_type,
raw_body: Arc::from(body_bytes.as_ref()),
source_url,
})
}
}
#[derive(Debug)]
pub enum PdsJwtFetchError {
Transport(CreateReportStageError),
Failed(RawPdsXrpcResponse),
InvalidBody {
resp: RawPdsXrpcResponse,
error: serde_json::Error,
},
MissingToken(RawPdsXrpcResponse),
}
impl std::fmt::Display for PdsJwtFetchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PdsJwtFetchError::Transport(e) => write!(f, "getServiceAuth transport: {e}"),
PdsJwtFetchError::Failed(resp) => {
write!(f, "getServiceAuth returned status {}", resp.status)
}
PdsJwtFetchError::InvalidBody { error, .. } => {
write!(f, "getServiceAuth body not JSON: {error}")
}
PdsJwtFetchError::MissingToken(_) => write!(f, "getServiceAuth response missing token"),
}
}
}
impl PdsJwtFetchError {
fn into_diagnostic(self) -> Option<Box<dyn miette::Diagnostic + Send + Sync>> {
match self {
Self::Transport(_) => None,
Self::Failed(resp) | Self::InvalidBody { resp, .. } | Self::MissingToken(resp) => {
let (source_code, span) = body_as_named_source_from_pds(&resp);
let diag = CreateReportDiagnostic::PdsServiceAuthRejected {
origin: ResponseOrigin::Pds,
status: resp.status.as_u16(),
source_code,
span,
};
Some(Box::new(diag))
}
}
}
}
pub struct PdsJwtFetcher<'a> {
client: &'a dyn PdsXrpcClient,
}
impl<'a> PdsJwtFetcher<'a> {
pub fn new(client: &'a dyn PdsXrpcClient) -> Self {
Self { client }
}
pub async fn fetch_with_jwt(
&self,
access_jwt: &str,
aud: &str,
lxm: &str,
exp_absolute_unix: i64,
) -> Result<String, PdsJwtFetchError> {
let exp_s = exp_absolute_unix.to_string();
let resp = self
.client
.get(
"xrpc/com.atproto.server.getServiceAuth",
Some(access_jwt),
&[("aud", aud), ("lxm", lxm), ("exp", &exp_s)],
)
.await
.map_err(PdsJwtFetchError::Transport)?;
if !resp.status.is_success() {
return Err(PdsJwtFetchError::Failed(resp));
}
let token = match serde_json::from_slice::<serde_json::Value>(&resp.raw_body) {
Err(error) => Err(PdsJwtFetchError::InvalidBody { resp, error }),
Ok(auth) => auth["token"]
.as_str()
.map(|s| s.to_string())
.ok_or_else(|| PdsJwtFetchError::MissingToken(resp)),
}?;
Ok(token)
}
}
pub struct PdsProxiedPoster<'a> {
client: &'a dyn PdsXrpcClient,
}
impl<'a> PdsProxiedPoster<'a> {
pub fn new(client: &'a dyn PdsXrpcClient) -> Self {
Self { client }
}
pub async fn post(
&self,
labeler_did: &str,
access_jwt: &str,
body: &serde_json::Value,
) -> Result<RawPdsXrpcResponse, CreateReportStageError> {
self.client
.post(
"xrpc/com.atproto.moderation.createReport",
Some(access_jwt),
Some(&format!("{labeler_did}#atproto_labeler")),
body,
)
.await
}
}
#[derive(Debug, Clone, Default)]
pub struct CreateReportFacts {
pub self_mint_succeeded: Option<bool>,
pub pds_service_auth_succeeded: Option<bool>,
pub pds_proxied_succeeded: Option<bool>,
}
#[derive(Debug)]
pub struct CreateReportStageOutput {
pub facts: Option<CreateReportFacts>,
pub results: Vec<CheckResult>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Check {
ContractPublished,
UnauthenticatedRejected,
MalformedBearerRejected,
WrongAudRejected,
WrongLxmRejected,
ExpiredRejected,
RejectedShapeReturns400,
SelfMintAccepted,
PdsServiceAuthAccepted,
PdsProxiedAccepted,
}
impl Check {
pub fn id(self) -> &'static str {
match self {
Check::ContractPublished => "report::contract_published",
Check::UnauthenticatedRejected => "report::unauthenticated_rejected",
Check::MalformedBearerRejected => "report::malformed_bearer_rejected",
Check::WrongAudRejected => "report::wrong_aud_rejected",
Check::WrongLxmRejected => "report::wrong_lxm_rejected",
Check::ExpiredRejected => "report::expired_rejected",
Check::RejectedShapeReturns400 => "report::rejected_shape_returns_400",
Check::SelfMintAccepted => "report::self_mint_accepted",
Check::PdsServiceAuthAccepted => "report::pds_service_auth_accepted",
Check::PdsProxiedAccepted => "report::pds_proxied_accepted",
}
}
pub const ORDER: [Check; 10] = [
Check::ContractPublished,
Check::UnauthenticatedRejected,
Check::MalformedBearerRejected,
Check::WrongAudRejected,
Check::WrongLxmRejected,
Check::ExpiredRejected,
Check::RejectedShapeReturns400,
Check::SelfMintAccepted,
Check::PdsServiceAuthAccepted,
Check::PdsProxiedAccepted,
];
pub fn pass(self) -> CheckResult {
CheckResult {
id: self.id(),
stage: Stage::Report,
status: CheckStatus::Pass,
summary: Cow::Borrowed(self.default_summary_pass()),
diagnostic: None,
skipped_reason: None,
}
}
pub fn spec_violation(self, diagnostic: CreateReportDiagnostic) -> CheckResult {
CheckResult {
id: self.id(),
stage: Stage::Report,
status: CheckStatus::SpecViolation,
summary: Cow::Borrowed(self.default_summary_fail()),
diagnostic: Some(Box::new(diagnostic) as _),
skipped_reason: None,
}
}
pub fn advisory(self, diagnostic: CreateReportDiagnostic) -> CheckResult {
CheckResult {
id: self.id(),
stage: Stage::Report,
status: CheckStatus::Advisory,
summary: Cow::Borrowed(self.default_summary_fail()),
diagnostic: Some(Box::new(diagnostic) as _),
skipped_reason: None,
}
}
pub fn network_error(self, message: String) -> CheckResult {
CheckResult {
id: self.id(),
stage: Stage::Report,
status: CheckStatus::NetworkError,
summary: Cow::Owned(format!("{}: {message}", self.default_summary_fail())),
diagnostic: None,
skipped_reason: None,
}
}
pub fn skip(self, reason: &'static str) -> CheckResult {
CheckResult {
id: self.id(),
stage: Stage::Report,
status: CheckStatus::Skipped,
summary: Cow::Borrowed(self.default_summary_pass()),
diagnostic: None,
skipped_reason: Some(Cow::Borrowed(reason)),
}
}
fn default_summary_pass(self) -> &'static str {
match self {
Check::ContractPublished => "Labeler advertises reportable shape",
Check::UnauthenticatedRejected => "Unauthenticated report rejected",
Check::MalformedBearerRejected => "Malformed bearer rejected",
Check::WrongAudRejected => "JWT with wrong `aud` rejected",
Check::WrongLxmRejected => "JWT with wrong `lxm` rejected",
Check::ExpiredRejected => "Expired JWT rejected",
Check::RejectedShapeReturns400 => "Invalid shape returns 400 InvalidRequest",
Check::SelfMintAccepted => "Self-mint report accepted",
Check::PdsServiceAuthAccepted => "PDS-minted JWT accepted",
Check::PdsProxiedAccepted => "PDS-proxied report accepted",
}
}
fn default_summary_fail(self) -> &'static str {
match self {
Check::ContractPublished => "Labeler does not advertise a reportable shape",
Check::UnauthenticatedRejected => {
"Unauthenticated report accepted (should have been rejected)"
}
Check::MalformedBearerRejected => {
"Malformed bearer accepted (should have been rejected)"
}
Check::WrongAudRejected => "JWT with wrong `aud` accepted",
Check::WrongLxmRejected => "JWT with wrong `lxm` accepted",
Check::ExpiredRejected => "Expired JWT accepted",
Check::RejectedShapeReturns400 => "Rejection status was not 400 InvalidRequest",
Check::SelfMintAccepted => "Self-mint report rejected",
Check::PdsServiceAuthAccepted => "PDS-minted JWT rejected",
Check::PdsProxiedAccepted => "PDS-proxied report rejected",
}
}
}
pub struct CreateReportRunOptions<'a> {
pub commit_report: bool,
pub force_self_mint: bool,
pub self_mint_curve: self_mint::SelfMintCurve,
pub report_subject_override: Option<&'a crate::common::identity::Did>,
pub self_mint_signer: Option<&'a self_mint::SelfMintSigner>,
pub pds_credentials: Option<&'a crate::commands::test::labeler::pipeline::PdsCredentials>,
pub pds_xrpc_client: Option<&'a dyn PdsXrpcClient>,
pub pds_resolution_error: Option<&'a str>,
pub run_id: &'a str,
}
pub async fn run(
identity_facts: Option<&crate::commands::test::labeler::identity::IdentityFacts>,
report_tee: &dyn CreateReportTee,
opts: &CreateReportRunOptions<'_>,
) -> CreateReportStageOutput {
let mut results = Vec::with_capacity(10);
let Some(id_facts) = identity_facts else {
for c in Check::ORDER {
results.push(c.skip("blocked by identity stage"));
}
return CreateReportStageOutput {
facts: None,
results,
};
};
let reason_types = id_facts.reason_types.as_ref();
let subject_types = id_facts.subject_types.as_ref();
let has_reason_types = reason_types.map(|v| !v.is_empty()).unwrap_or(false);
let has_subject_types = subject_types.map(|v| !v.is_empty()).unwrap_or(false);
let contract_advertised = has_reason_types && has_subject_types;
if !contract_advertised {
if opts.commit_report {
let diag = CreateReportDiagnostic::ContractMissing {
has_reason_types,
has_subject_types,
};
results.push(Check::ContractPublished.spec_violation(diag));
for c in Check::ORDER.iter().skip(1).copied() {
results.push(c.skip("blocked by `report::contract_published`"));
}
} else {
results.push(
Check::ContractPublished.skip("labeler does not advertise report acceptance"),
);
for c in Check::ORDER.iter().skip(1).copied() {
results.push(c.skip("labeler does not advertise report acceptance"));
}
}
return CreateReportStageOutput {
facts: None,
results,
};
}
results.push(Check::ContractPublished.pass());
let negative_body = build_minimal_report_body(id_facts);
match report_tee.post_create_report(None, &negative_body).await {
Ok(resp) => match RejectionShape::classify(&resp) {
RejectionShape::Conformant { .. } => {
results.push(Check::UnauthenticatedRejected.pass());
}
RejectionShape::ConformantStatusNonConformantShape => {
results.push(CheckResult {
summary: Cow::Borrowed(
"Unauthenticated report rejected (status 401, non-conformant envelope)",
),
..Check::UnauthenticatedRejected.pass()
});
}
RejectionShape::WrongStatus { status } => {
let status_u16 = status.as_u16();
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::UnauthenticatedAccepted {
status: status_u16,
source_code,
span,
};
results.push(Check::UnauthenticatedRejected.spec_violation(diag));
}
},
Err(CreateReportStageError::Transport { source }) => {
results.push(Check::UnauthenticatedRejected.network_error(source.to_string()));
}
}
match report_tee
.post_create_report(Some("not-a-jwt"), &negative_body)
.await
{
Ok(resp) => match RejectionShape::classify(&resp) {
RejectionShape::Conformant { .. } => {
results.push(Check::MalformedBearerRejected.pass());
}
RejectionShape::ConformantStatusNonConformantShape => {
results.push(CheckResult {
summary: Cow::Borrowed(
"Malformed bearer rejected (status 401, non-conformant envelope)",
),
..Check::MalformedBearerRejected.pass()
});
}
RejectionShape::WrongStatus { status } => {
let status_u16 = status.as_u16();
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::MalformedBearerAccepted {
status: status_u16,
source_code,
span,
};
results.push(Check::MalformedBearerRejected.spec_violation(diag));
}
},
Err(CreateReportStageError::Transport { source }) => {
results.push(Check::MalformedBearerRejected.network_error(source.to_string()));
}
}
let is_local_labeler = is_local_labeler_hostname(&id_facts.labeler_endpoint);
let self_mint_viable = opts.force_self_mint || is_local_labeler;
let signer_for_negative = if self_mint_viable {
opts.self_mint_signer
} else {
None
};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
if let Some(signer) = signer_for_negative {
let lxm = "com.atproto.moderation.createReport";
let template =
signer.valid_claims_template(&id_facts.did, lxm, now, Duration::from_secs(60));
let negative_body = build_minimal_report_body(id_facts);
{
let mut claims = template.clone();
claims.aud = "did:plc:0000000000000000000000000".to_string();
let token = signer.sign_jwt(claims);
match report_tee
.post_create_report(Some(&token), &negative_body)
.await
{
Ok(resp) => match RejectionShape::classify(&resp) {
RejectionShape::Conformant { .. } => {
results.push(Check::WrongAudRejected.pass())
}
RejectionShape::ConformantStatusNonConformantShape => {
results.push(CheckResult {
summary: Cow::Borrowed(
"Rejected with 401 but envelope is non-conformant",
),
..Check::WrongAudRejected.pass()
})
}
RejectionShape::WrongStatus { .. } => {
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::WrongAudAccepted {
status: resp.status.as_u16(),
source_code,
span,
};
results.push(Check::WrongAudRejected.spec_violation(diag));
}
},
Err(CreateReportStageError::Transport { source }) => {
results.push(Check::WrongAudRejected.network_error(source.to_string()));
}
}
}
{
let mut claims = template.clone();
claims.lxm = "com.atproto.server.getSession".to_string();
let token = signer.sign_jwt(claims);
match report_tee
.post_create_report(Some(&token), &negative_body)
.await
{
Ok(resp) => match RejectionShape::classify(&resp) {
RejectionShape::Conformant { .. } => {
results.push(Check::WrongLxmRejected.pass())
}
RejectionShape::ConformantStatusNonConformantShape => {
results.push(CheckResult {
summary: Cow::Borrowed(
"Rejected with 401 but envelope is non-conformant",
),
..Check::WrongLxmRejected.pass()
})
}
RejectionShape::WrongStatus { .. } => {
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::WrongLxmAccepted {
status: resp.status.as_u16(),
source_code,
span,
};
results.push(Check::WrongLxmRejected.spec_violation(diag));
}
},
Err(CreateReportStageError::Transport { source }) => {
results.push(Check::WrongLxmRejected.network_error(source.to_string()));
}
}
}
{
let mut claims = template.clone();
claims.exp = now - 300;
claims.iat = now - 360;
let token = signer.sign_jwt(claims);
match report_tee
.post_create_report(Some(&token), &negative_body)
.await
{
Ok(resp) => match RejectionShape::classify(&resp) {
RejectionShape::Conformant { .. } => {
results.push(Check::ExpiredRejected.pass())
}
RejectionShape::ConformantStatusNonConformantShape => {
results.push(CheckResult {
summary: Cow::Borrowed(
"Rejected with 401 but envelope is non-conformant",
),
..Check::ExpiredRejected.pass()
})
}
RejectionShape::WrongStatus { .. } => {
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::ExpiredAccepted {
status: resp.status.as_u16(),
source_code,
span,
};
results.push(Check::ExpiredRejected.spec_violation(diag));
}
},
Err(CreateReportStageError::Transport { source }) => {
results.push(Check::ExpiredRejected.network_error(source.to_string()));
}
}
}
{
let claims = template.clone();
let token = signer.sign_jwt(claims);
let bogus_reason_type = synth_unadvertised_reason_type(id_facts);
let invalid_body = {
let mut body = negative_body.clone();
if let Some(obj) = body.as_object_mut() {
obj.insert(
"reasonType".to_string(),
serde_json::Value::String(bogus_reason_type),
);
}
body
};
match report_tee
.post_create_report(Some(&token), &invalid_body)
.await
{
Ok(resp) => {
let envelope = XrpcErrorEnvelope::parse(&resp.raw_body);
let error_name = envelope.as_ref().and_then(|e| e.error.clone());
if resp.status == reqwest::StatusCode::BAD_REQUEST
&& error_name.as_deref() == Some("InvalidRequest")
{
results.push(Check::RejectedShapeReturns400.pass());
} else if resp.status == reqwest::StatusCode::UNAUTHORIZED
|| resp.status.is_server_error()
{
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::ShapeNot400 {
status: resp.status.as_u16(),
error_name: error_name.clone(),
source_code,
span,
};
results.push(Check::RejectedShapeReturns400.advisory(diag));
} else if resp.status == reqwest::StatusCode::BAD_REQUEST {
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::ShapeNot400 {
status: 400,
error_name: error_name.clone(),
source_code,
span,
};
results.push(Check::RejectedShapeReturns400.advisory(diag));
} else {
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::ShapeNot400 {
status: resp.status.as_u16(),
error_name,
source_code,
span,
};
results.push(Check::RejectedShapeReturns400.advisory(diag));
}
}
Err(CreateReportStageError::Transport { source }) => {
results.push(Check::RejectedShapeReturns400.network_error(source.to_string()));
}
}
}
} else {
let reason = "self-mint required; labeler endpoint appears non-local (override with --force-self-mint)";
for c in [
Check::WrongAudRejected,
Check::WrongLxmRejected,
Check::ExpiredRejected,
Check::RejectedShapeReturns400,
] {
results.push(c.skip(reason));
}
}
if !opts.commit_report {
results.push(Check::SelfMintAccepted.skip("commit gated behind --commit-report"));
} else if let Some(signer) = signer_for_negative {
let reason_type = pollution::choose_reason_type(
id_facts.reason_types.as_deref().unwrap_or(&[]),
is_local_labeler,
);
let subject = pollution::choose_subject(
id_facts.subject_types.as_deref().unwrap_or(&[]),
signer.issuer_did(),
opts.report_subject_override,
is_local_labeler,
);
let sentinel = sentinel::build(opts.run_id, SystemTime::now());
let positive_body = serde_json::json!({
"reasonType": reason_type,
"subject": subject,
"reason": sentinel,
});
let claims = signer.valid_claims_template(
&id_facts.did,
"com.atproto.moderation.createReport",
now,
Duration::from_secs(60),
);
let token = signer.sign_jwt(claims);
match report_tee
.post_create_report(Some(&token), &positive_body)
.await
{
Ok(resp) if resp.status.is_success() => {
let body_ok = serde_json::from_slice::<serde_json::Value>(&resp.raw_body)
.ok()
.and_then(|v| v.get("id").and_then(|id| id.as_i64()))
.is_some();
if body_ok {
results.push(Check::SelfMintAccepted.pass());
} else {
results.push(CheckResult {
summary: Cow::Borrowed(
"Self-mint report accepted (2xx), body did not match createReport#output shape",
),
..Check::SelfMintAccepted.pass()
});
}
}
Ok(resp) => {
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::SelfMintRejected {
status: resp.status.as_u16(),
source_code,
span,
};
results.push(Check::SelfMintAccepted.spec_violation(diag));
}
Err(CreateReportStageError::Transport { source }) => {
results.push(Check::SelfMintAccepted.network_error(source.to_string()));
}
}
} else {
let reason = "self-mint required; labeler endpoint appears non-local (override with --force-self-mint)";
results.push(Check::SelfMintAccepted.skip(reason));
}
let pds_gate_reason: &'static str = "requires --handle, --app-password, and --commit-report";
let pds_ready =
opts.commit_report && opts.pds_credentials.is_some() && opts.pds_xrpc_client.is_some();
let pds_resolution_failed = opts.commit_report
&& opts.pds_credentials.is_some()
&& opts.pds_xrpc_client.is_none()
&& opts.pds_resolution_error.is_some();
if pds_resolution_failed {
let msg = opts
.pds_resolution_error
.expect("pds_resolution_failed implies Some")
.to_string();
results.push(Check::PdsServiceAuthAccepted.network_error(msg.clone()));
results.push(Check::PdsProxiedAccepted.network_error(msg));
} else if !pds_ready {
results.push(Check::PdsServiceAuthAccepted.skip(pds_gate_reason));
results.push(Check::PdsProxiedAccepted.skip(pds_gate_reason));
} else {
let creds = opts.pds_credentials.expect("pds_ready implies creds");
let pds_client = opts.pds_xrpc_client.expect("pds_ready implies client");
let is_local = is_local_labeler;
let reason_type = pollution::choose_reason_type(
id_facts.reason_types.as_deref().unwrap_or(&[]),
is_local,
);
match fetch_session_and_did(pds_client, &creds.handle, &creds.app_password).await {
Err(message) => {
results.push(Check::PdsServiceAuthAccepted.network_error(message.clone()));
results.push(Check::PdsProxiedAccepted.network_error(message));
}
Ok(session) => {
let user_did = Did(session.did);
let access_jwt = session.access_jwt;
let subject = pollution::choose_subject(
id_facts.subject_types.as_deref().unwrap_or(&[]),
&user_did,
opts.report_subject_override,
is_local,
);
let sentinel = sentinel::build(opts.run_id, SystemTime::now());
let pds_body = serde_json::json!({
"reasonType": reason_type,
"subject": subject,
"reason": sentinel,
});
let exp_abs = now + 60;
let fetcher = PdsJwtFetcher::new(pds_client);
match fetcher
.fetch_with_jwt(
&access_jwt,
&id_facts.did.0,
"com.atproto.moderation.createReport",
exp_abs,
)
.await
{
Err(e) => {
let message = e.to_string();
let diagnostic = e.into_diagnostic();
results.push(CheckResult {
diagnostic,
..Check::PdsServiceAuthAccepted.network_error(message)
});
}
Ok(service_jwt) => {
match report_tee
.post_create_report(Some(&service_jwt), &pds_body)
.await
{
Ok(resp) if resp.status.is_success() => {
results.push(Check::PdsServiceAuthAccepted.pass());
}
Ok(resp) => {
let (source_code, span) = body_as_named_source(&resp);
let diag = CreateReportDiagnostic::PdsServiceAuthRejected {
origin: ResponseOrigin::Labeler,
status: resp.status.as_u16(),
source_code,
span,
};
results.push(Check::PdsServiceAuthAccepted.spec_violation(diag));
}
Err(CreateReportStageError::Transport { source }) => {
results.push(
Check::PdsServiceAuthAccepted.network_error(source.to_string()),
);
}
}
}
}
let proxier = PdsProxiedPoster::new(pds_client);
match proxier.post(&id_facts.did.0, &access_jwt, &pds_body).await {
Err(CreateReportStageError::Transport { source }) => {
results.push(Check::PdsProxiedAccepted.network_error(source.to_string()));
}
Ok(resp) if resp.status.is_success() => {
results.push(Check::PdsProxiedAccepted.pass());
}
Ok(resp) => {
let (source_code, span) = body_as_named_source_from_pds(&resp);
let envelope = XrpcErrorEnvelope::parse(&resp.raw_body);
let err_name = envelope.as_ref().and_then(|e| e.error.clone());
let is_upstream_label_error = matches!(
err_name.as_deref(),
Some("UpstreamError") | Some("UpstreamFailure")
) || resp.status.as_u16() == 502
|| resp.status.as_u16() == 504;
if is_upstream_label_error {
let diag = CreateReportDiagnostic::PdsProxiedRejected {
origin: ResponseOrigin::Labeler,
status: resp.status.as_u16(),
source_code,
span,
};
results.push(Check::PdsProxiedAccepted.spec_violation(diag));
} else {
let diag = CreateReportDiagnostic::PdsProxiedRejected {
origin: ResponseOrigin::Pds,
status: resp.status.as_u16(),
source_code,
span,
};
results.push(CheckResult {
diagnostic: Some(Box::new(diag)),
..Check::PdsProxiedAccepted.network_error(format!(
"PDS rejected proxy attempt with status {}",
resp.status
))
});
}
}
}
}
}
}
CreateReportStageOutput {
facts: None,
results,
}
}
struct SessionResult {
did: String,
access_jwt: String,
}
async fn fetch_session_and_did(
client: &dyn PdsXrpcClient,
handle: &str,
app_password: &str,
) -> Result<SessionResult, String> {
let body = serde_json::json!({ "identifier": handle, "password": app_password });
let resp = client
.post("xrpc/com.atproto.server.createSession", None, None, &body)
.await
.map_err(|e| format!("createSession transport: {e}"))?;
if !resp.status.is_success() {
return Err(format!("createSession returned {}", resp.status));
}
let session: serde_json::Value =
serde_json::from_slice(&resp.raw_body).map_err(|e| format!("createSession body: {e}"))?;
let did = session["did"]
.as_str()
.ok_or("createSession missing did")?
.to_string();
let access_jwt = session["accessJwt"]
.as_str()
.ok_or("createSession missing accessJwt")?
.to_string();
Ok(SessionResult { did, access_jwt })
}
fn synth_unadvertised_reason_type(facts: &IdentityFacts) -> String {
let empty = Vec::new();
let advertised: &[String] = facts.reason_types.as_ref().unwrap_or(&empty);
for i in 0..1000 {
let candidate = format!("xyz.atprotodevtool.conformance.defs#unadvertised{i:03}");
if !advertised.iter().any(|r| r == &candidate) {
return candidate;
}
}
"xyz.atprotodevtool.conformance.defs#unadvertisedFallback".to_string()
}
#[cfg(test)]
mod check_tests {
use super::*;
#[test]
fn check_ids_are_unique_and_report_namespaced() {
let mut seen = std::collections::HashSet::new();
for c in Check::ORDER {
let id = c.id();
assert!(id.starts_with("report::"), "{id} not in report:: namespace");
assert!(seen.insert(id), "duplicate check id: {id}");
}
assert_eq!(Check::ORDER.len(), 10, "DoD requires exactly 10 checks");
}
}
#[derive(Debug, Clone)]
pub struct XrpcErrorEnvelope {
pub error: Option<String>,
pub message: Option<String>,
}
impl XrpcErrorEnvelope {
pub fn parse(body: &[u8]) -> Option<Self> {
let v: serde_json::Value = serde_json::from_slice(body).ok()?;
let obj = v.as_object()?;
Some(Self {
error: obj.get("error").and_then(|x| x.as_str()).map(String::from),
message: obj
.get("message")
.and_then(|x| x.as_str())
.map(String::from),
})
}
pub fn has_nonempty_error(&self) -> bool {
self.error
.as_deref()
.map(|s| !s.is_empty())
.unwrap_or(false)
}
}
pub enum RejectionShape {
Conformant {
envelope: XrpcErrorEnvelope,
},
ConformantStatusNonConformantShape,
WrongStatus {
status: reqwest::StatusCode,
},
}
impl RejectionShape {
pub fn classify(resp: &RawCreateReportResponse) -> Self {
if resp.status != reqwest::StatusCode::UNAUTHORIZED {
return Self::WrongStatus {
status: resp.status,
};
}
match XrpcErrorEnvelope::parse(&resp.raw_body) {
Some(env) if env.has_nonempty_error() => Self::Conformant { envelope: env },
_ => Self::ConformantStatusNonConformantShape,
}
}
}
#[derive(Debug, Error, Diagnostic)]
pub enum CreateReportDiagnostic {
#[error("Labeler does not advertise a reportable `LabelerPolicies` shape")]
#[diagnostic(
code = "labeler::report::contract_missing",
help = "`reasonTypes` and `subjectTypes` must both be present and non-empty on the labeler's published policies; the tool cannot verify reporting conformance without them."
)]
ContractMissing {
has_reason_types: bool,
has_subject_types: bool,
},
#[error("Labeler accepted unauthenticated createReport (status {status})")]
#[diagnostic(
code = "labeler::report::unauthenticated_accepted",
help = "A labeler must reject createReport with 401 when no Authorization header is supplied."
)]
UnauthenticatedAccepted {
status: u16,
#[source_code]
source_code: NamedSource<Arc<[u8]>>,
#[label("accepted here")]
span: SourceSpan,
},
#[error("Labeler accepted malformed Bearer token (status {status})")]
#[diagnostic(
code = "labeler::report::malformed_bearer_accepted",
help = "A labeler must reject createReport with 401 when the Authorization header carries a non-JWT string."
)]
MalformedBearerAccepted {
status: u16,
#[source_code]
source_code: NamedSource<Arc<[u8]>>,
#[label("accepted here")]
span: SourceSpan,
},
#[error("Labeler accepted JWT with wrong `aud` (status {status})")]
#[diagnostic(
code = "labeler::report::wrong_aud_accepted",
help = "A labeler must reject JWTs whose `aud` claim does not match its own DID."
)]
WrongAudAccepted {
status: u16,
#[source_code]
source_code: NamedSource<Arc<[u8]>>,
#[label("accepted here")]
span: SourceSpan,
},
#[error("Labeler accepted JWT with wrong `lxm` (status {status})")]
#[diagnostic(
code = "labeler::report::wrong_lxm_accepted",
help = "A labeler must reject JWTs whose `lxm` claim does not match the invoked Lexicon method."
)]
WrongLxmAccepted {
status: u16,
#[source_code]
source_code: NamedSource<Arc<[u8]>>,
#[label("accepted here")]
span: SourceSpan,
},
#[error("Labeler accepted expired JWT (status {status})")]
#[diagnostic(
code = "labeler::report::expired_accepted",
help = "A labeler must reject JWTs whose `exp` claim is in the past."
)]
ExpiredAccepted {
status: u16,
#[source_code]
source_code: NamedSource<Arc<[u8]>>,
#[label("accepted here")]
span: SourceSpan,
},
#[error(
"Unadvertised `reasonType` was rejected with status {status}, expected 400 InvalidRequest"
)]
#[diagnostic(
code = "labeler::report::shape_not_400",
help = "A labeler should return 400 InvalidRequest (not 401 or 500) for a `reasonType` not listed in its published LabelerPolicies.reasonTypes."
)]
ShapeNot400 {
status: u16,
error_name: Option<String>,
#[source_code]
source_code: NamedSource<Arc<[u8]>>,
#[label("rejected with wrong status here")]
span: SourceSpan,
},
#[error("Self-mint report rejected (status {status})")]
#[diagnostic(
code = "labeler::report::self_mint_rejected",
help = "A labeler that advertises reportable shape should accept a well-formed, authenticated createReport. Check the labeler's service-auth validation and its acceptance of the advertised reasonType/subject shape."
)]
SelfMintRejected {
status: u16,
#[source_code]
source_code: NamedSource<Arc<[u8]>>,
#[label("rejected here")]
span: SourceSpan,
},
#[error("{origin} rejected PDS-minted service-auth createReport (status {status})")]
#[diagnostic(
code = "labeler::report::pds_service_auth_rejected",
help = "When `origin` is `Labeler`, the PDS issued a service-auth JWT bound to the labeler's DID and the createReport NSID; the labeler should have accepted it. When `origin` is `PDS`, the user's PDS refused to mint the service-auth JWT — verify the handle and app password, and confirm `--handle` resolves to a PDS that can mint service-auth tokens for this user."
)]
PdsServiceAuthRejected {
origin: ResponseOrigin,
status: u16,
#[source_code]
source_code: NamedSource<Arc<[u8]>>,
#[label("rejected here")]
span: SourceSpan,
},
#[error("{origin} rejected PDS-proxied createReport (status {status})")]
#[diagnostic(
code = "labeler::report::pds_proxied_rejected",
help = "When `origin` is `Labeler`, the PDS forwarded the createReport call on the user's behalf; the downstream labeler reached it but rejected the submission. When `origin` is `PDS`, the user's PDS rejected the proxied call before it could reach the labeler — verify the handle, app password, and that the PDS is configured to proxy moderation calls to the target labeler."
)]
PdsProxiedRejected {
origin: ResponseOrigin,
status: u16,
#[source_code]
source_code: NamedSource<Arc<[u8]>>,
#[label("rejected here")]
span: SourceSpan,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseOrigin {
Labeler,
Pds,
}
impl std::fmt::Display for ResponseOrigin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ResponseOrigin::Labeler => f.write_str("Labeler"),
ResponseOrigin::Pds => f.write_str("PDS"),
}
}
}
pub(crate) fn body_as_named_source(
resp: &RawCreateReportResponse,
) -> (NamedSource<Arc<[u8]>>, SourceSpan) {
let pretty = pretty_json_for_display(&resp.raw_body);
let span = SourceSpan::new(0.into(), pretty.len());
(NamedSource::new(resp.source_url.clone(), pretty), span)
}
pub(crate) fn body_as_named_source_from_pds(
resp: &RawPdsXrpcResponse,
) -> (NamedSource<Arc<[u8]>>, SourceSpan) {
let pretty = pretty_json_for_display(&resp.raw_body);
let span = SourceSpan::new(0.into(), pretty.len());
(NamedSource::new(resp.source_url.clone(), pretty), span)
}
pub(crate) fn build_minimal_report_body(facts: &IdentityFacts) -> serde_json::Value {
let reason_type = facts
.reason_types
.as_ref()
.and_then(|v| v.first())
.cloned()
.unwrap_or_else(|| "com.atproto.moderation.defs#reasonOther".to_string());
let subject_types: &[String] = facts.subject_types.as_deref().unwrap_or(&[]);
let subject = if subject_types.iter().any(|t| t == "account") {
serde_json::json!({
"$type": "com.atproto.admin.defs#repoRef",
"did": facts.did.0,
})
} else if subject_types.iter().any(|t| t == "record") {
serde_json::json!({
"$type": "com.atproto.repo.strongRef",
"uri": format!("at://{}/app.bsky.feed.post/not-real", facts.did.0),
"cid": "bafyreiaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
})
} else {
serde_json::json!({
"$type": "com.atproto.admin.defs#repoRef",
"did": facts.did.0,
})
};
serde_json::json!({
"reasonType": reason_type,
"subject": subject,
})
}
#[cfg(test)]
mod envelope_tests {
use super::*;
#[test]
fn parse_well_formed_envelope() {
let body = br#"{"error":"BadJwt","message":"invalid token"}"#;
let env = XrpcErrorEnvelope::parse(body).expect("parses");
assert_eq!(env.error.as_deref(), Some("BadJwt"));
assert_eq!(env.message.as_deref(), Some("invalid token"));
assert!(env.has_nonempty_error());
}
#[test]
fn parse_empty_envelope() {
let body = br#"{}"#;
let env = XrpcErrorEnvelope::parse(body).expect("parses empty object");
assert_eq!(env.error, None);
assert!(!env.has_nonempty_error());
}
#[test]
fn parse_non_json_returns_none() {
assert!(XrpcErrorEnvelope::parse(b"<html>").is_none());
}
#[test]
fn parse_empty_error_field_treated_as_missing() {
let body = br#"{"error":""}"#;
let env = XrpcErrorEnvelope::parse(body).unwrap();
assert!(!env.has_nonempty_error());
}
}