tapped 0.1.0

Rust wrapper for the tap ATProto utility
Documentation

tapped

A Rust wrapper library for the tap ATProto sync utility.

tapped provides an idiomatic async Rust interface for spawning and communicating with a tap subprocess, making it easy to build applications that sync data from the ATProto network.

Features

  • Spawn and manage tap subprocesses with graceful shutdown
  • Strongly-typed configuration for all tap envvars
  • Strongly-typed async Rust functions covering all of tap's HTTP API endpoints
  • WebSocket-based event channel with automatic acknowledgment

Installation

Add to your Cargo.toml:

[dependencies]
tapped = "0.1"

You'll also need the tap binary. Build it from the indigo repository:

cd cmd/tap && go build

tapped has been most recently tested against:

tap version v0.0.0-20260114211028-207c9d49d0de-rev-207c9d4

Quick Start

use tapped::{TapHandle, TapConfig, Event};

#[tokio::main]
async fn main() -> tapped::Result<()> {
    let config = TapConfig::builder()
        .database_url("sqlite://tap.db")
        .collection_filter("app.bsky.feed.post")
        .build();

    // Spawn tap and connect
    let handle = TapHandle::spawn_default(config).await?;
    
    // Subscribe to events
    let mut channel = handle.channel().await?;
    
    while let Ok(received) = channel.recv().await {
        match &received.event {
            Event::Record(record) => {
                println!("[{}] {}/{}", 
                    record.action, 
                    record.collection, 
                    record.rkey
                );
            }
            Event::Identity(identity) => {
                println!("Identity: {} -> {}", identity.did, identity.handle);
            }
        }
        // Event is auto-acknowledged when `received` is dropped
    }

    Ok(())
}

Usage Patterns

Connect to Existing Instance

If you have a tap instance already running:

use tapped::TapClient;

let client = TapClient::new("http://localhost:2480")?;
client.health().await?;

Spawn with Custom Binary Path

use tapped::{TapProcess, TapConfig};

let config = TapConfig::builder()
    .database_url("sqlite://my-app.db")
    .build();

let mut process = TapProcess::spawn("/path/to/tap", config).await?;
let client = process.client();

// Use the client...

process.shutdown().await?;

Using TapHandle (Recommended)

TapHandle combines process management and client access:

use tapped::{TapHandle, TapConfig};

let config = TapConfig::builder()
    .database_url("sqlite://app.db")
    .full_network(false)
    .build();

let handle = TapHandle::spawn_default(config).await?;

// TapHandle derefs to TapClient, so you can call client methods directly
handle.health().await?;
let count = handle.repo_count().await?;
println!("Tracking {} repos", count);

Configuration Options

use tapped::{TapConfig, LogLevel};
use std::time::Duration;

let config = TapConfig::builder()
    // Database
    .database_url("sqlite://tap.db")
    .max_db_conns(10)
    
    // Network
    .bind("127.0.0.1:2480")
    .relay_url("wss://bsky.network")
    .plc_url("https://plc.directory")
    
    // Filtering
    .signal_collection("app.bsky.feed.post")
    .collection_filter("app.bsky.feed.post")
    .collection_filter("app.bsky.feed.like")
    .full_network(false)
    
    // Performance
    .firehose_parallelism(10)
    .resync_parallelism(5)
    .outbox_parallelism(10)
    .outbox_capacity(10000)
    
    // Timeouts
    .repo_fetch_timeout(Duration::from_secs(30))
    .startup_timeout(Duration::from_secs(60))
    .shutdown_timeout(Duration::from_secs(10))
    
    // Logging
    .log_level(LogLevel::Info)
    
    .build();

Working with Events

Events are automatically acknowledged when dropped:

use tapped::{Event, RecordAction};

let mut channel = client.channel().await?;

while let Ok(received) = channel.recv().await {
    match &received.event {
        Event::Record(record) => {
            match record.action {
                RecordAction::Create => {
                    if let Some(ref rec) = record.record {
                        // Access the raw JSON
                        println!("Type: {:?}", rec.record_type());
                        
                        // Or deserialize to a specific type
                        // let post: MyPostType = rec.deserialize_as()?;
                    }
                }
                RecordAction::Update => { /* ... */ }
                RecordAction::Delete => { /* ... */ }
                _ => {}
            }
        }
        Event::Identity(identity) => {
            println!("{} is now @{}", identity.did, identity.handle);
        }
    }
    // Ack sent automatically here when `received` goes out of scope
}

Managing Repositories

// Add repos to track
client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;

// Remove repos
client.remove_repos(&["did:plc:abc123"]).await?;

// Get info about a specific repo
let info = client.repo_info("did:plc:def456").await?;
println!("State: {:?}, Records: {}", info.state, info.records);

// Resolve a DID to its document
let doc = client.resolve_did("did:plc:def456").await?;
println!("Handles: {:?}", doc.also_known_as);

Checking Stats

let repos = client.repo_count().await?;
let records = client.record_count().await?;
let outbox = client.outbox_buffer().await?;
let resync = client.resync_buffer().await?;
let cursors = client.cursors().await?;

println!("Tracking {} repos with {} records", repos, records);
println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
println!("Firehose cursor: {:?}", cursors.firehose);

Error Handling

All operations return tapped::Result<T>, which uses the tapped::Error enum:

use tapped::Error;

match client.health().await {
    Ok(()) => println!("Healthy!"),
    Err(Error::Http(e)) => println!("HTTP error: {}", e),
    Err(Error::Timeout) => println!("Request timed out"),
    Err(e) => println!("Other error: {}", e),
}

Example: Syncing Standard Site Records

See examples/standard_site_sync.rs for a complete example that syncs site.standard.publication and site.standard.document records to local files.

cargo run --example standard_site_sync

License

MIT