atproto-devtool 0.1.1

A multitool for the atproto developer ecosystem
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
//! HTTP stage for the labeler conformance suite.
//!
//! Performs `com.atproto.label.queryLabels` requests against the labeler endpoint,
//! verifies schema conformance, and exercises pagination.

use std::borrow::Cow;
use std::sync::Arc;

use async_trait::async_trait;
use atrium_api::com::atproto::label::defs::Label;
use atrium_api::com::atproto::label::query_labels;
use base64::Engine;
use miette::{Diagnostic, NamedSource, SourceSpan};
use thiserror::Error;
use url::Url;

use crate::commands::test::labeler::report::{CheckResult, CheckStatus, Stage};
use crate::common::diagnostics::{pretty_json_for_display, span_at_line_column};

/// Checks emitted by the HTTP stage.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Check {
    /// Whether the labeler endpoint responded to an HTTP request.
    EndpointReachable,
    /// Schema validation of the first queryLabels page.
    QueryLabelsSchemaFirstPage,
    /// Advisory when the labeler has no published labels.
    QueryLabelsEmptyAdvisory,
    /// Schema validation of the second queryLabels page.
    QueryLabelsSchemaSecondPage,
    /// Pagination cursor round-trip check.
    PaginationRoundTrip,
    /// Labeler ignored the cursor parameter.
    PaginationIgnoredCursor,
}

impl Check {
    /// Stable check ID string used in `CheckResult.id`.
    pub fn id(self) -> &'static str {
        match self {
            Check::EndpointReachable => "http::endpoint_reachable",
            Check::QueryLabelsSchemaFirstPage => "http::query_labels_schema_first_page",
            Check::QueryLabelsEmptyAdvisory => "http::query_labels_empty_advisory",
            Check::QueryLabelsSchemaSecondPage => "http::query_labels_schema_second_page",
            Check::PaginationRoundTrip => "http::pagination_round_trip",
            Check::PaginationIgnoredCursor => "http::pagination_ignored_cursor",
        }
    }

    pub fn pass(self) -> CheckResult {
        CheckResult {
            id: self.id(),
            stage: Stage::Http,
            status: CheckStatus::Pass,
            summary: Cow::Borrowed(match self {
                Check::EndpointReachable => "Labeler endpoint reachability",
                Check::QueryLabelsSchemaFirstPage => "First page schema",
                Check::QueryLabelsSchemaSecondPage => "Second page schema",
                Check::PaginationRoundTrip => "Pagination round-trip",
                _ => "HTTP check passed",
            }),
            diagnostic: None,
            skipped_reason: None,
        }
    }

    pub fn spec_violation(
        self,
        diagnostic: Option<Box<dyn miette::Diagnostic + Send + Sync>>,
    ) -> CheckResult {
        CheckResult {
            id: self.id(),
            stage: Stage::Http,
            status: CheckStatus::SpecViolation,
            summary: Cow::Borrowed(match self {
                Check::QueryLabelsSchemaFirstPage => "Schema validation failed",
                Check::QueryLabelsSchemaSecondPage => "Second page schema validation failed",
                Check::PaginationIgnoredCursor => "Labeler ignored the cursor parameter",
                _ => "HTTP check failed",
            }),
            diagnostic,
            skipped_reason: None,
        }
    }

    pub fn network_error(self) -> CheckResult {
        CheckResult {
            id: self.id(),
            stage: Stage::Http,
            status: CheckStatus::NetworkError,
            summary: Cow::Borrowed(match self {
                Check::EndpointReachable => "Labeler endpoint unreachable",
                Check::QueryLabelsSchemaSecondPage => "Second page fetch failed",
                _ => "HTTP network error",
            }),
            diagnostic: None,
            skipped_reason: None,
        }
    }

    pub fn advisory(self) -> CheckResult {
        CheckResult {
            id: self.id(),
            stage: Stage::Http,
            status: CheckStatus::Advisory,
            summary: Cow::Borrowed(match self {
                Check::QueryLabelsEmptyAdvisory => "Labeler has no published labels",
                _ => "HTTP advisory",
            }),
            diagnostic: None,
            skipped_reason: None,
        }
    }
}

/// Facts gathered from the HTTP stage, populated only when all checks pass.
#[derive(Debug, Clone)]
pub struct HttpFacts {
    /// The parsed labels from the first page of the query response.
    pub first_page: Vec<Label>,
    /// Raw bytes of the first page response body.
    pub first_page_raw_bytes: Arc<[u8]>,
    /// The source URL where the first page was retrieved.
    pub first_page_source_url: String,
    /// Whether pagination passed the round-trip check.
    pub pagination_ok: bool,
}

/// Output from the HTTP stage: facts (if all checks pass) plus all check results.
#[derive(Debug)]
pub struct HttpStageOutput {
    /// Facts populated only when all checks pass and no check is blocking.
    pub facts: Option<HttpFacts>,
    /// All check results from this stage.
    pub results: Vec<CheckResult>,
}

/// Raw HTTP response with both decoded and raw body.
pub struct RawXrpcResponse {
    /// HTTP status code.
    pub status: reqwest::StatusCode,
    /// Raw response body bytes.
    pub raw_body: Arc<[u8]>,
    /// Decoded typed response.
    pub decoded: query_labels::Output,
    /// The source URL where the response came from.
    pub source_url: String,
}

/// Diagnostic for schema decode failures with source context.
#[derive(Debug, Error, Diagnostic)]
#[error("{message}")]
#[diagnostic(code = "labeler::http::schema_failure")]
pub struct HttpDecodeFailure {
    /// The error message.
    pub message: String,
    /// The raw response bytes.
    #[source_code]
    pub source_code: NamedSource<Arc<[u8]>>,
    /// Span highlighting the error location.
    #[label("JSON error")]
    pub span: Option<SourceSpan>,
}

/// Error type for HTTP stage operations.
#[derive(Debug, Error)]
pub enum HttpStageError {
    /// Network or TLS error reaching the endpoint.
    #[error("HTTP transport error: {message}")]
    Transport {
        /// Human-readable error message.
        message: String,
        /// The underlying error, if available.
        #[source]
        source: Option<Box<dyn std::error::Error + Send + Sync>>,
    },

    /// Decode failure of a valid HTTP response.
    #[error("Schema decode failure")]
    DecodeFailed {
        /// Raw response body bytes.
        raw_body: Arc<[u8]>,
        /// The JSON decode error.
        source: serde_json::Error,
        /// The source URL for diagnostic context.
        source_url: String,
    },
}

/// Trait for teeing HTTP responses, allowing both decode and raw bytes capture.
///
/// Implementations perform `com.atproto.label.queryLabels` calls and return both
/// the decoded typed response and the raw bytes for diagnostics.
#[async_trait]
pub trait RawHttpTee: Send + Sync {
    /// Perform a `com.atproto.label.queryLabels` call against the labeler.
    ///
    /// Returns both the raw response body and, if decoding succeeded, the typed Output.
    ///
    /// # Arguments
    /// * `cursor` - Optional cursor string for pagination.
    async fn query_labels(&self, cursor: Option<&str>) -> Result<RawXrpcResponse, HttpStageError>;
}

/// Real HTTP client implementation using reqwest.
pub struct RealHttpTee {
    /// The base HTTP client.
    client: reqwest::Client,
    /// The labeler endpoint URL.
    endpoint: Url,
}

impl RealHttpTee {
    /// Create a new RealHttpTee with the given endpoint.
    pub fn new(client: reqwest::Client, endpoint: Url) -> Self {
        RealHttpTee { client, endpoint }
    }
}

#[async_trait]
impl RawHttpTee for RealHttpTee {
    async fn query_labels(&self, cursor: Option<&str>) -> Result<RawXrpcResponse, HttpStageError> {
        // Build the XRPC endpoint URL.
        let mut url = self.endpoint.clone();
        url.set_path("xrpc/com.atproto.label.queryLabels");

        // Set query parameters.
        {
            let mut query = url.query_pairs_mut();
            query.append_pair("uriPatterns", "*");
            query.append_pair("limit", "50");
            if let Some(c) = cursor {
                query.append_pair("cursor", c);
            }
        }

        let source_url = url.to_string();

        tracing::debug!(
            url = %source_url,
            cursor = ?cursor,
            "http stage: issuing queryLabels GET"
        );

        // Perform the GET request.
        let response =
            self.client
                .get(url.as_str())
                .send()
                .await
                .map_err(|e| HttpStageError::Transport {
                    message: e.to_string(),
                    source: Some(Box::new(e)),
                })?;

        let status = response.status();
        let body_bytes = response
            .bytes()
            .await
            .map_err(|e| HttpStageError::Transport {
                message: e.to_string(),
                source: Some(Box::new(e)),
            })?;

        tracing::debug!(
            url = %source_url,
            status = %status,
            body_len = body_bytes.len(),
            "http stage: queryLabels response received"
        );
        let raw_body: Arc<[u8]> = Arc::from(body_bytes.as_ref());

        // Attempt to decode the response. The atproto JSON encoding wraps
        // every bytes value as `{"$bytes": "<base64>"}`, but atrium-api's
        // generated types annotate byte fields with `#[serde(with = "serde_bytes")]`
        // which expects a raw byte sequence. Parse the body into a
        // `serde_json::Value` first, rewrite every `{"$bytes": "<base64>"}`
        // object into an array of byte integers, then hand the transformed
        // value to atrium for typed decoding.
        let decoded = decode_query_labels_output(&raw_body).map_err(|source| {
            HttpStageError::DecodeFailed {
                raw_body: raw_body.clone(),
                source,
                source_url: source_url.clone(),
            }
        })?;

        Ok(RawXrpcResponse {
            status,
            raw_body,
            decoded,
            source_url,
        })
    }
}

/// Deserialize a `com.atproto.label.queryLabels` response body into the
/// atrium-generated `query_labels::Output`, translating the atproto JSON
/// `{"$bytes": "<base64>"}` wrapper into a plain byte sequence so that
/// `serde_bytes`-annotated fields (notably `Label.sig`) deserialize cleanly.
///
/// On failure the returned error is a `serde_json::Error`, so the existing
/// `span_for_json_error` helper can still point at the relevant source line.
fn decode_query_labels_output(body: &[u8]) -> Result<query_labels::Output, serde_json::Error> {
    let mut value: serde_json::Value = serde_json::from_slice(body)?;
    rewrite_atproto_json_bytes(&mut value);
    serde_json::from_value(value)
}

/// Walk a `serde_json::Value` and rewrite every
/// `{"$bytes": "<base64>"}` object into a `Value::Array` of byte integers.
///
/// This mirrors the atproto JSON representation of `bytes` values (see
/// https://atproto.com/specs/data-model#json-representation) so that downstream
/// deserialization into `Vec<u8>` (or `serde_bytes`-wrapped equivalents) works
/// without a custom `Visitor`. Objects whose `$bytes` value is not a base64
/// string (malformed, extra keys, non-string) are left unchanged so that the
/// eventual typed deserialization surfaces a meaningful error instead of
/// silently corrupting data.
fn rewrite_atproto_json_bytes(value: &mut serde_json::Value) {
    use serde_json::Value;
    match value {
        Value::Object(map) => {
            if let Some(decoded) = decode_atproto_bytes_wrapper(map) {
                *value = Value::Array(
                    decoded
                        .into_iter()
                        .map(|b| Value::Number(b.into()))
                        .collect(),
                );
                return;
            }
            for child in map.values_mut() {
                rewrite_atproto_json_bytes(child);
            }
        }
        Value::Array(arr) => {
            for child in arr.iter_mut() {
                rewrite_atproto_json_bytes(child);
            }
        }
        _ => {}
    }
}

/// If `map` is the single-key object `{"$bytes": "<base64>"}`, decode and
/// return the bytes. Otherwise return `None`. Accepts both padded and
/// unpadded standard base64 (the atproto spec says padded, but real servers
/// commonly omit padding).
fn decode_atproto_bytes_wrapper(
    map: &serde_json::Map<String, serde_json::Value>,
) -> Option<Vec<u8>> {
    if map.len() != 1 {
        return None;
    }
    let encoded = match map.get("$bytes")? {
        serde_json::Value::String(s) => s,
        _ => return None,
    };
    let stripped = encoded.trim_end_matches('=');
    base64::engine::general_purpose::STANDARD_NO_PAD
        .decode(stripped)
        .ok()
}

/// Re-run the decode against a pretty-printed copy of the body to obtain a
/// line/column pair that points into `pretty_body` rather than into the
/// original one-line wire body. If pretty-printing doesn't change the error
/// site (or the pretty body doesn't parse for some other reason), falls back
/// to the original error's location.
fn decode_error_location_for_display(
    pretty_body: &[u8],
    raw_err: &serde_json::Error,
) -> (usize, usize) {
    if let Err(err) = decode_query_labels_output(pretty_body) {
        (err.line(), err.column())
    } else {
        (raw_err.line(), raw_err.column())
    }
}

/// Run the HTTP stage against a labeler endpoint.
///
/// # Arguments
/// * `http` - The HTTP client implementation (usually `RealHttpTee` in production, or fake in tests).
///
/// # Returns
/// `HttpStageOutput` containing check results and facts (if all checks pass).
pub async fn run(http: &dyn RawHttpTee) -> HttpStageOutput {
    let mut results = Vec::new();

    // Fetch first page; derive both endpoint_reachable and query_labels_schema_first_page from the same request.
    let first_response = match http.query_labels(None).await {
        Ok(resp) => {
            // Endpoint is reachable if we got any response (2xx or non-2xx).
            if resp.status.is_success() {
                results.push(Check::EndpointReachable.pass());
            } else {
                let status_code = resp.status;
                results.push(CheckResult {
                    summary: Cow::Owned(format!(
                        "Labeler endpoint reachability (status {status_code})"
                    )),
                    ..Check::EndpointReachable.pass()
                });
            }
            resp
        }
        Err(HttpStageError::Transport { message, .. }) => {
            // Network/TLS error: endpoint is unreachable.
            results.push(CheckResult {
                summary: Cow::Owned(format!("Network error: {message}")),
                ..Check::EndpointReachable.network_error()
            });
            return HttpStageOutput {
                facts: None,
                results,
            };
        }
        Err(HttpStageError::DecodeFailed {
            raw_body,
            source,
            source_url,
        }) => {
            // Endpoint is reachable, but schema decode failed.
            results.push(Check::EndpointReachable.pass());
            let pretty_body = pretty_json_for_display(&raw_body);
            let (line, column) = decode_error_location_for_display(&pretty_body, &source);
            let diagnostic = Box::new(HttpDecodeFailure {
                message: format!("Failed to decode query_labels response: {source}"),
                source_code: NamedSource::new(source_url.clone(), pretty_body.clone()),
                span: Some(span_at_line_column(&pretty_body, line, column)),
            });
            results.push(Check::QueryLabelsSchemaFirstPage.spec_violation(Some(diagnostic)));
            return HttpStageOutput {
                facts: None,
                results,
            };
        }
    };

    // Decode succeeded (we return early on decode failure above).
    let output = &first_response.decoded;
    results.push(Check::QueryLabelsSchemaFirstPage.pass());

    let first_page_labels = output.labels.clone();
    let first_page_raw_bytes = first_response.raw_body.clone();
    let first_page_source_url = first_response.source_url.clone();

    if first_page_labels.is_empty() {
        results.push(Check::QueryLabelsEmptyAdvisory.advisory());
    }

    let pagination_ok = if let Some(cursor) = &output.cursor {
        match http.query_labels(Some(cursor)).await {
            Ok(second_resp) => {
                let second_output = &second_resp.decoded;
                // Check if cursor was actually honored.
                if second_output.labels == first_page_labels {
                    results.push(Check::QueryLabelsSchemaSecondPage.pass());
                    results.push(Check::PaginationIgnoredCursor.spec_violation(None));
                    false
                } else {
                    results.push(Check::QueryLabelsSchemaSecondPage.pass());
                    results.push(Check::PaginationRoundTrip.pass());
                    true
                }
            }
            Err(HttpStageError::Transport { message, .. }) => {
                results.push(CheckResult {
                    summary: Cow::Owned(format!("Network error fetching second page: {message}")),
                    ..Check::QueryLabelsSchemaSecondPage.network_error()
                });
                false
            }
            Err(HttpStageError::DecodeFailed {
                raw_body,
                source,
                source_url,
            }) => {
                let pretty_body = pretty_json_for_display(&raw_body);
                let (line, column) = decode_error_location_for_display(&pretty_body, &source);
                let diagnostic = Box::new(HttpDecodeFailure {
                    message: format!("Failed to decode second page response: {source}"),
                    source_code: NamedSource::new(source_url, pretty_body.clone()),
                    span: Some(span_at_line_column(&pretty_body, line, column)),
                });
                results.push(Check::QueryLabelsSchemaSecondPage.spec_violation(Some(diagnostic)));
                false
            }
        }
    } else {
        // No cursor: pagination not exercised, but that's OK.
        results.push(CheckResult {
            summary: Cow::Borrowed("First page was complete; pagination not exercised"),
            ..Check::PaginationRoundTrip.pass()
        });
        true
    };

    // Facts consumed by the crypto stage.
    let facts = HttpFacts {
        first_page: first_page_labels,
        first_page_raw_bytes,
        first_page_source_url,
        pagination_ok,
    };

    HttpStageOutput {
        facts: Some(facts),
        results,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn rewrite_atproto_json_bytes_replaces_wrapper() {
        let mut value: serde_json::Value =
            serde_json::from_str(r#"{"sig": {"$bytes": "AAECAw"}, "other": 1}"#).unwrap();
        rewrite_atproto_json_bytes(&mut value);
        assert_eq!(value["sig"], serde_json::json!([0, 1, 2, 3]));
        assert_eq!(value["other"], serde_json::json!(1));
    }

    #[test]
    fn rewrite_atproto_json_bytes_accepts_padded_base64() {
        let mut value: serde_json::Value =
            serde_json::from_str(r#"{"$bytes": "AAECAw=="}"#).unwrap();
        rewrite_atproto_json_bytes(&mut value);
        assert_eq!(value, serde_json::json!([0, 1, 2, 3]));
    }

    #[test]
    fn rewrite_atproto_json_bytes_ignores_non_wrapper_objects() {
        // Extra keys: not a wrapper, must not be rewritten.
        let mut value: serde_json::Value =
            serde_json::from_str(r#"{"$bytes": "AAECAw", "extra": true}"#).unwrap();
        let before = value.clone();
        rewrite_atproto_json_bytes(&mut value);
        assert_eq!(value, before);
    }

    #[test]
    fn decode_query_labels_output_handles_dollar_bytes_sig() {
        // Minimal queryLabels Output with one label whose `sig` is wrapped.
        let body = br#"{"cursor":"c","labels":[{"ver":1,"src":"did:plc:aaa22222222222222222bbbbbb","uri":"at://did:plc:aaa22222222222222222bbbbbb/app.bsky.feed.post/abc","val":"spam","cts":"2026-01-01T00:00:00.000Z","sig":{"$bytes":"AAECAw"}}]}"#;
        let output = decode_query_labels_output(body).expect("should decode");
        assert_eq!(output.labels.len(), 1);
        let sig = output.labels[0].sig.as_ref().expect("sig present");
        assert_eq!(sig, &vec![0u8, 1, 2, 3]);
    }
}