tapped 0.3.0

Rust wrapper for the tap ATProto utility
Documentation
# tapped

[![Crates.io Version](https://img.shields.io/crates/v/tapped)](https://crates.io/crates/tapped)
[![docs.rs](https://img.shields.io/docsrs/tapped)](https://docs.rs/tapped)

A Rust wrapper library for the [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) ATProto sync utility.

`tapped` provides an idiomatic async Rust interface for spawning and communicating with a `tap` subprocess, making it easy to build applications that sync data from the ATProto network.

## Features

- Spawn and manage `tap` subprocesses with graceful shutdown
- Strongly-typed configuration for all tap envvars
- Strongly-typed async Rust functions covering all of tap's HTTP API endpoints
- WebSocket-based event channel with automatic acknowledgment

## Installation

Add to your `Cargo.toml`:

```toml
[dependencies]
tapped = "0.3"
```

You'll also need the `tap` binary. Build it from the [indigo repository](https://github.com/bluesky-social/indigo):

```bash
cd cmd/tap && go build
```

`tapped` has been most recently tested against:

```
tap version v0.0.0-20260120225912-12d69fa4d209-rev-12d69fa
```

## Quick Start

```rust
use tapped::{TapProcess, TapConfig, Event};

#[tokio::main]
async fn main() -> tapped::Result<()> {
    let config = TapConfig::builder()
        .database_url("sqlite://tap.db")
        .collection_filter("app.bsky.feed.post")
        .build();

    // Spawn tap - looks in current directory first, then PATH
    let process = TapProcess::spawn_default(config).await?;
    let client = process.client()?;

    // Subscribe to events
    let mut channel = client.channel().await?;
    
    while let Ok(received) = channel.recv().await {
        match &received.event {
            Event::Record(record) => {
                println!("[{:?}] {}/{}",
                    record.action,
                    record.collection,
                    record.rkey
                );
            }
            Event::Identity(identity) => {
                println!("Identity: {} -> {}", identity.did, identity.handle);
            }
        }
        // Event is auto-acknowledged when `received` is dropped
    }

    Ok(())
}
```

## Usage Patterns

### Connect to Existing Instance

If you have a tap instance already running:

```rust
use tapped::TapClient;

let client = TapClient::new("http://localhost:2480")?;
client.health().await?;
```

### Spawn an Instance

```rust
use tapped::{TapProcess, TapConfig};

let config = TapConfig::builder()
    .database_url("sqlite://app.db")
    .full_network(false)
    .build();

// If you need a custom binary path, use spawn() instead:
// let process = TapProcess::spawn("/path/to/tap", config).await?;
let process = TapProcess::spawn_default(config).await?;
let client = process.client()?;

// Use the client
client.health().await?;
let count = client.repo_count().await?;
println!("Tracking {} repos", count);
```

### Configuration Options

```rust
use tapped::{TapConfig, LogLevel};
use std::time::Duration;

let config = TapConfig::builder()
    // Database
    .database_url("sqlite://tap.db")
    .max_db_conns(10)
    
    // Network
    .bind("127.0.0.1:2480")
    .relay_url("wss://bsky.network".parse().unwrap())
    .plc_url("https://plc.directory".parse().unwrap())
    
    // Filtering
    .signal_collection("app.bsky.feed.post")
    .collection_filter("app.bsky.feed.post")
    .collection_filter("app.bsky.feed.like")
    .full_network(false)
    
    // Performance
    .firehose_parallelism(10)
    .resync_parallelism(5)
    .outbox_parallelism(10)
    .outbox_capacity(10000)
    
    // Timeouts
    .repo_fetch_timeout(Duration::from_secs(30))
    .startup_timeout(Duration::from_secs(60))
    .shutdown_timeout(Duration::from_secs(10))
    
    // Logging
    .log_level(LogLevel::Info)
    
    .build();
```

### Working with Events

Events are automatically acknowledged when dropped:

```rust
use tapped::{Event, RecordAction};

let mut channel = client.channel().await?;

while let Ok(received) = channel.recv().await {
    match &received.event {
        Event::Record(record) => {
            match record.action {
                RecordAction::Create => {
                    // Access the raw JSON as a string
                    if let Some(json) = record.record_as_str() {
                        println!("Raw JSON: {}", json);
                    }

                    // Or deserialize to a specific type
                    // let post: MyPostType = record.deserialize_as()?;
                }
                RecordAction::Update => { /* ... */ }
                RecordAction::Delete => { /* ... */ }
                _ => {}
            }
        }
        Event::Identity(identity) => {
            println!("{} is now @{}", identity.did, identity.handle);
        }
    }
    // Ack sent automatically here when `received` goes out of scope
}
```

### Managing Repositories

```rust
// Add repos to track
client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;

// Remove repos
client.remove_repos(&["did:plc:abc123"]).await?;

// Get info about a specific repo
let info = client.repo_info("did:plc:def456").await?;
println!("State: {:?}, Records: {}", info.state, info.records);

// Resolve a DID to its document
let doc = client.resolve_did("did:plc:def456").await?;
println!("Handles: {:?}", doc.also_known_as);
```

### Checking Stats

```rust
let repos = client.repo_count().await?;
let records = client.record_count().await?;
let outbox = client.outbox_buffer().await?;
let resync = client.resync_buffer().await?;
let cursors = client.cursors().await?;

println!("Tracking {} repos with {} records", repos, records);
println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
println!("Firehose cursor: {:?}", cursors.firehose);
```

## Example: Syncing standard.site Records with Schema Generation and Validation

The repository includes a complete example demonstrating how to sync and validate ATProto records using `tapped` together with the [jacquard](https://crates.io/crates/jacquard) crates.

The jacquard ecosystem provides runtime validation of records against their lexicon constraints, and the ability to generate Rust structs from lexicon JSON files.

```
tapped/
├── tapped/                 # The main tapped library
├── lexicons-example/       # Generated types from lexicon schemas
│   ├── lexicons/           # Source lexicon JSON files
│   │   ├── site.standard.publication.json
│   │   ├── site.standard.document.json
│   │   └── ...
│   └── src/                # Generated Rust code
└── standard-site-sync/     # Example binary using both packages
```

These files were generated like so:

```bash
# Install the code generator
cargo install jacquard-lexgen

jacquard-codegen -i lexicons-example/lexicons -o lexicons-example/src
```

This produces strongly-typed structs with built-in validation. For example, the `site.standard.publication` lexicon becomes:

```rust
use lexicons_example::site_standard::publication::Publication;

// Deserialize from JSON
let publication: Publication = serde_json::from_str(json)?;

// Validate against lexicon constraints (max length, grapheme limits, etc.)
publication.validate()?;

// Access typed fields
println!("Name: {}", publication.name.as_str());
println!("URL: {}", publication.url.as_str());
```

For more detail see `process_record_event` [in `main.rs`](https://tangled.org/octet-stream.net/tapped/blob/main/standard-site-sync/src/main.rs#L140-201).

## License

MIT