synta 0.2.3

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
#!/usr/bin/env python3
"""doc-rust-samples.py — extract and wrap Rust code blocks from Synta docs.

Called by doc-rust-samples.sh with:
    python3 doc-rust-samples.py <work_dir> <file1.md> [file2.md ...]

Outputs to stdout a single line:
    total_blocks<TAB>skip_blocks

Writes to <work_dir>:

  raw/NNNNN.rs          — raw extracted code (no wrapper)
  src/NNNNN.rs          — wrapped, ready-to-compile translation unit
  src/NNNNN.combined.rs — preceding toplevel/program blocks prepended before
                          the current block (only when preceding blocks exist
                          in the same doc file); used for the retry step
  manifest.tsv          — tab-separated, one row per compilable block:
      doc_file  start_line  lang  src_file  kind  raw_file  combined_file

kind values
-----------
  skip_annotated — fenced block has ``ignore`` or ``compile_fail`` annotation
  skip_nonsynta  — no ``synta`` identifier found; skip
  skip_nostd     — uses #![no_std]; needs a cross-compilation target; skip
  skip_foreign   — uses a foreign ASN.1 crate (der, asn1-rs, yasna); skip
  skip_serde     — uses serde_json (requires --features serde to be passed); skip
  program        — has ``fn main()``; main is renamed, compiled as lib
  toplevel       — top-level fn/struct/impl/type/etc definition
  fragment       — standalone statements; wrapped in a harness function
"""

import os
import re
import sys

# Set to True when the caller passes --features-serde so that serde_json
# blocks are included in the manifest rather than skipped.
_SERDE_ENABLED = False


# ── Regexps ──────────────────────────────────────────────────────────────────

# Block must reference the synta crate to be worth compiling.
# Case-insensitive so that migration-comparison blocks containing a comment
# like "// Synta" still reach FOREIGN_RE (which skips them correctly) rather
# than being dropped as skip_nonsynta before FOREIGN_RE is ever checked.
SYNTA_RE = re.compile(r"\bsynta\b", re.IGNORECASE)

# Blocks using #![no_std] need a bare-metal / embedded cross-compilation
# target that is not available in this validator.  Skip them.
NOSTD_RE = re.compile(r"#!\[no_std\]")

# serde_json is a dev-dependency of the synta workspace.  Snippets that use
# serde_json are only valid when the crate is built with --features serde.
# Skip them unless the shell script enables serde (see --features-serde).
#
# Typical trigger line:
#   let json = serde_json::to_string(&value).unwrap();
SERDE_RE = re.compile(r"\bserde_json\b")

# Identifiers that indicate comparison code using a foreign Rust ASN.1 crate.
# These blocks show side-by-side migration examples and cannot be compiled
# because the foreign crate is not a project dependency.
#
# Typical trigger lines:
#   use der::{Decode, Encode};
#   use asn1_rs::FromDer;
#   use yasna;
FOREIGN_RE = re.compile(
    r"(?:^|\W)use\s+der\s*::"
    r"|(?:^|\W)use\s+asn1_rs\s*::"
    r"|(?:^|\W)use\s+yasna\b"
    r"|(?:^|\W)use\s+asn1\s*::",
    re.MULTILINE,
)

# fn main() → complete program block.
MAIN_RE = re.compile(r"\bfn\s+main\s*\(\s*\)")

# Top-level Rust item definitions that begin at column 0 (possibly prefixed
# with a visibility modifier and/or qualifiers).  Covers:
#   fn, async fn, unsafe fn, extern "C" fn,
#   struct, enum, impl (including impl Trait for Type),
#   type alias, const, static, trait, macro_rules!
TOPLEVEL_RE = re.compile(
    r"^(?:pub(?:\s*\([^)]*\))?\s+)?"  # optional visibility (pub / pub(crate) / …)
    r"(?:async\s+)?(?:unsafe\s+)?"  # optional qualifiers
    r'(?:extern\s+"[^"]+"\s+)?'  # optional extern ABI
    r"(?:fn|struct|enum|impl|type|const|static|trait|macro_rules!)\b",
    re.MULTILINE,
)

# Fenced-block annotations that mean "this block is deliberately not
# compilable" and should be skipped entirely.
#   ignore       — rustdoc explicitly ignores this block
#   compile_fail — block is expected to fail compilation (anti-pattern doc)
SKIP_ANNOTATIONS = frozenset({"ignore", "compile_fail"})


# ── Source wrappers ──────────────────────────────────────────────────────────

# File-level preamble applied to every generated translation unit.
# The glob import brings all publicly re-exported Synta types into scope
# so that snippets using the full path (synta::Integer) or the short form
# (Integer) both resolve without error.
PREAMBLE = """\
// Auto-generated wrapper for Synta documentation snippet.
#![allow(
    unused_imports,
    unused_variables,
    unused_mut,
    dead_code,
    unused_must_use,
    unreachable_code,
    unused_assignments,
)]
use synta::*;
"""

# Pre-declared identifiers for fragment harness functions.  These cover
# variables that appear in doc snippets which show partial code assuming
# context supplied by the surrounding prose (e.g. a Decoder already
# constructed by a preceding paragraph).
#
# Pre-declarations live in the outer scope; the fragment itself is placed
# in a nested { } block, so re-declaring any of these identifiers inside
# the fragment simply shadows the outer binding rather than causing a
# "duplicate declaration" error.
FRAGMENT_VARS = """\
    // ── pre-declared identifiers for doc fragment compilation ──
    let data: Vec<u8> = vec![0x02, 0x01, 0x2a];
    let mut decoder = synta::Decoder::new(data.as_slice(), synta::Encoding::Der);
    let mut encoder = synta::Encoder::new(synta::Encoding::Der);
    let integer     = synta::Integer::from(0i64);
"""

# The harness function return type uses the fully-qualified std path so it
# is not shadowed by synta::Result<T>, which only accepts a single type
# parameter (the error type is always synta::Error).
_HARNESS_RETURN = "std::result::Result<(), Box<dyn std::error::Error>>"


def classify(code: str, annotation: str) -> str:
    """Classify a Rust code block.  Returns one of the kind strings above."""
    if annotation in SKIP_ANNOTATIONS:
        return "skip_annotated"
    if not SYNTA_RE.search(code):
        return "skip_nonsynta"
    if NOSTD_RE.search(code):
        return "skip_nostd"
    if FOREIGN_RE.search(code):
        return "skip_foreign"
    if SERDE_RE.search(code) and not _SERDE_ENABLED:
        return "skip_serde"
    if MAIN_RE.search(code):
        return "program"
    if TOPLEVEL_RE.search(code):
        return "toplevel"
    return "fragment"


def wrap_program(code: str) -> str:
    """Rename fn main() so the snippet compiles as a lib crate."""
    renamed = re.sub(r"\bfn\s+main\s*\(\s*\)", "fn _synta_doc_main()", code)
    return PREAMBLE + "\n" + renamed + "\n"


def wrap_toplevel(code: str) -> str:
    """Place a top-level item block at file scope with the standard preamble."""
    return PREAMBLE + "\n" + code + "\n"


def wrap_fragment(code: str) -> str:
    """Wrap standalone statement blocks in a harness function.

    The harness returns ``std::result::Result<(), Box<dyn std::error::Error>>``
    (fully-qualified, to avoid shadowing by ``synta::Result<T>``) so that the
    ``?`` operator works inside the fragment without any additional annotation.
    The fragment is placed in a nested ``{ }`` scope so that re-declarations of
    any pre-declared identifier simply shadow (rather than conflict with) the
    outer binding.
    """
    indented = "\n".join("        " + line for line in code.splitlines())
    return (
        PREAMBLE
        + f"\nfn _synta_doc_sample() -> {_HARNESS_RETURN} {{\n"
        + FRAGMENT_VARS
        + "    // ── fragment (nested scope allows re-declaration) ──\n"
        + "    {\n"
        + indented
        + "\n"
        + "    }\n"
        + "    Ok(())\n"
        + "}\n"
    )


def wrap(code: str, kind: str) -> str:
    """Dispatch to the appropriate wrapper based on kind."""
    if kind == "program":
        return wrap_program(code)
    if kind == "toplevel":
        return wrap_toplevel(code)
    return wrap_fragment(code)


def wrap_combined(prev_codes: list[str], cur_code: str, cur_kind: str) -> str:
    """Combine preceding toplevel/program blocks with the current block.

    Preceding blocks are placed at file scope so that any types, functions,
    or constants they define are visible to the current block.  The current
    block is then wrapped according to its own kind.
    """
    prefix = PREAMBLE + "\n" + "\n".join(prev_codes) + "\n"
    if cur_kind == "fragment":
        indented = "\n".join("        " + line for line in cur_code.splitlines())
        return (
            prefix
            + f"\nfn _synta_doc_sample() -> {_HARNESS_RETURN} {{\n"
            + FRAGMENT_VARS
            + "    // ── fragment (nested scope allows re-declaration) ──\n"
            + "    {\n"
            + indented
            + "\n"
            + "    }\n"
            + "    Ok(())\n"
            + "}\n"
        )
    if cur_kind == "program":
        renamed = re.sub(r"\bfn\s+main\s*\(\s*\)", "fn _synta_doc_main()", cur_code)
        return prefix + "\n" + renamed + "\n"
    # toplevel: concatenate at file scope
    return prefix + "\n" + cur_code + "\n"


# ── Main extraction loop ──────────────────────────────────────────────────────


def main() -> None:
    global _SERDE_ENABLED

    args = sys.argv[1:]
    if not args:
        print(
            f"Usage: {sys.argv[0]} [--features-serde] <work_dir> [file.md ...]",
            file=sys.stderr,
        )
        sys.exit(1)

    # Strip optional --features-serde flag before positional arguments.
    if args[0] == "--features-serde":
        _SERDE_ENABLED = True
        args = args[1:]

    if not args:
        print(
            f"Usage: {sys.argv[0]} [--features-serde] <work_dir> [file.md ...]",
            file=sys.stderr,
        )
        sys.exit(1)

    work_dir = args[0]
    md_files = args[1:]

    raw_dir = os.path.join(work_dir, "raw")
    src_dir = os.path.join(work_dir, "src")
    os.makedirs(raw_dir, exist_ok=True)
    os.makedirs(src_dir, exist_ok=True)

    manifest_rows: list[str] = []
    block_n = 0
    skip_n = 0

    # Per-file accumulation of raw code from toplevel/program blocks.  When a
    # later block in the same file fails to compile standalone (e.g. it uses a
    # type or helper function defined in an earlier block), the shell script
    # retries with a combined file that prepends ALL preceding toplevel/program
    # blocks from the same document.  Key = md_file path, value = list of raw
    # code strings in document order.
    file_toplevel_history: dict[str, list[str]] = {}

    for md_path in md_files:
        try:
            with open(md_path, encoding="utf-8") as fh:
                lines = fh.readlines()
        except OSError as exc:
            print(f"warning: cannot read {md_path}: {exc}", file=sys.stderr)
            continue

        in_block = False
        annotation = ""
        start_line = 0
        buf: list[str] = []

        for lineno, line in enumerate(lines, 1):
            if not in_block:
                # Match ```rust or ```rust,<annotation> fenced-block openers.
                m = re.match(r"^```rust(?:,(\S+))?\s*$", line, re.IGNORECASE)
                if m:
                    annotation = (m.group(1) or "").lower()
                    in_block = True
                    start_line = (
                        lineno + 1
                    )  # +1: opening ``` line is not part of the block
                    buf = []
            else:
                if line.startswith("```"):
                    in_block = False
                    if buf:
                        block_n += 1
                        code = "".join(buf)
                        ext = "rs"

                        raw_path = os.path.join(raw_dir, f"{block_n:05d}.{ext}")
                        with open(raw_path, "w", encoding="utf-8") as fh:
                            fh.write(code)

                        kind = classify(code, annotation)

                        if kind.startswith("skip"):
                            skip_n += 1
                        else:
                            src_path = os.path.join(src_dir, f"{block_n:05d}.{ext}")
                            with open(src_path, "w", encoding="utf-8") as fh:
                                fh.write(wrap(code, kind))

                            # Build a combined file so the shell script can
                            # retry when the current block depends on types or
                            # functions defined in a preceding block of the
                            # same doc file.
                            combined_path = ""
                            prev = file_toplevel_history.get(md_path, [])
                            if prev:
                                combined_path = os.path.join(
                                    src_dir, f"{block_n:05d}.combined.{ext}"
                                )
                                with open(combined_path, "w", encoding="utf-8") as fh:
                                    fh.write(wrap_combined(prev, code, kind))

                            if kind in ("toplevel", "program"):
                                file_toplevel_history.setdefault(md_path, []).append(
                                    code
                                )

                            # Only compilable blocks appear in the manifest.
                            # Skipped blocks are excluded to avoid empty-field
                            # issues when bash reads tab-separated fields.
                            manifest_rows.append(
                                "\t".join(
                                    [
                                        md_path,
                                        str(start_line),
                                        "rust",
                                        src_path,
                                        kind,
                                        raw_path,
                                        combined_path,
                                    ]
                                )
                            )

                    buf = []
                else:
                    buf.append(line)

    manifest_path = os.path.join(work_dir, "manifest.tsv")
    with open(manifest_path, "w", encoding="utf-8") as fh:
        fh.write("\n".join(manifest_rows))
        if manifest_rows:
            fh.write("\n")

    # Single line on stdout consumed by the shell script:
    # total_blocks<TAB>skip_blocks
    print(f"{block_n}\t{skip_n}")


if __name__ == "__main__":
    main()