# tapped
[](https://crates.io/crates/tapped)
[](https://docs.rs/tapped)
A Rust wrapper library for the [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) ATProto sync utility.
`tapped` provides an idiomatic async Rust interface for spawning and communicating with a `tap` subprocess, making it easy to build applications that sync data from the ATProto network.
## Features
- Spawn and manage `tap` subprocesses with graceful shutdown
- Strongly-typed configuration for all tap envvars
- Strongly-typed async Rust functions covering all of tap's HTTP API endpoints
- WebSocket-based event channel with automatic acknowledgment
## Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
tapped = "0.3"
```
You'll also need the `tap` binary. Build it from the [indigo repository](https://github.com/bluesky-social/indigo):
```bash
cd cmd/tap && go build
```
`tapped` has been most recently tested against:
```
tap version v0.0.0-20260120225912-12d69fa4d209-rev-12d69fa
```
## Quick Start
```rust
use tapped::{TapProcess, TapConfig, Event};
#[tokio::main]
async fn main() -> tapped::Result<()> {
let config = TapConfig::builder()
.database_url("sqlite://tap.db")
.collection_filter("app.bsky.feed.post")
.build();
// Spawn tap - looks in current directory first, then PATH
let process = TapProcess::spawn_default(config).await?;
let client = process.client()?;
// Subscribe to events
let mut channel = client.channel().await?;
while let Ok(received) = channel.recv().await {
match &received.event {
Event::Record(record) => {
println!("[{:?}] {}/{}",
record.action,
record.collection,
record.rkey
);
}
Event::Identity(identity) => {
println!("Identity: {} -> {}", identity.did, identity.handle);
}
}
// Event is auto-acknowledged when `received` is dropped
}
Ok(())
}
```
## Usage Patterns
### Connect to Existing Instance
If you have a tap instance already running:
```rust
use tapped::TapClient;
let client = TapClient::new("http://localhost:2480")?;
client.health().await?;
```
### Spawn an Instance
```rust
use tapped::{TapProcess, TapConfig};
let config = TapConfig::builder()
.database_url("sqlite://app.db")
.full_network(false)
.build();
// If you need a custom binary path, use spawn() instead:
// let process = TapProcess::spawn("/path/to/tap", config).await?;
let process = TapProcess::spawn_default(config).await?;
let client = process.client()?;
// Use the client
client.health().await?;
let count = client.repo_count().await?;
println!("Tracking {} repos", count);
```
### Configuration Options
```rust
use tapped::{TapConfig, LogLevel};
use std::time::Duration;
let config = TapConfig::builder()
// Database
.database_url("sqlite://tap.db")
.max_db_conns(10)
// Network
.bind("127.0.0.1:2480")
.relay_url("wss://bsky.network".parse().unwrap())
.plc_url("https://plc.directory".parse().unwrap())
// Filtering
.signal_collection("app.bsky.feed.post")
.collection_filter("app.bsky.feed.post")
.collection_filter("app.bsky.feed.like")
.full_network(false)
// Performance
.firehose_parallelism(10)
.resync_parallelism(5)
.outbox_parallelism(10)
.outbox_capacity(10000)
// Timeouts
.repo_fetch_timeout(Duration::from_secs(30))
.startup_timeout(Duration::from_secs(60))
.shutdown_timeout(Duration::from_secs(10))
// Logging
.log_level(LogLevel::Info)
.build();
```
### Working with Events
Events are automatically acknowledged when dropped:
```rust
use tapped::{Event, RecordAction};
let mut channel = client.channel().await?;
while let Ok(received) = channel.recv().await {
match &received.event {
Event::Record(record) => {
match record.action {
RecordAction::Create => {
// Access the raw JSON as a string
if let Some(json) = record.record_as_str() {
println!("Raw JSON: {}", json);
}
// Or deserialize to a specific type
// let post: MyPostType = record.deserialize_as()?;
}
RecordAction::Update => { /* ... */ }
RecordAction::Delete => { /* ... */ }
_ => {}
}
}
Event::Identity(identity) => {
println!("{} is now @{}", identity.did, identity.handle);
}
}
// Ack sent automatically here when `received` goes out of scope
}
```
### Managing Repositories
```rust
// Add repos to track
client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;
// Remove repos
client.remove_repos(&["did:plc:abc123"]).await?;
// Get info about a specific repo
let info = client.repo_info("did:plc:def456").await?;
println!("State: {:?}, Records: {}", info.state, info.records);
// Resolve a DID to its document
let doc = client.resolve_did("did:plc:def456").await?;
println!("Handles: {:?}", doc.also_known_as);
```
### Checking Stats
```rust
let repos = client.repo_count().await?;
let records = client.record_count().await?;
let outbox = client.outbox_buffer().await?;
let resync = client.resync_buffer().await?;
let cursors = client.cursors().await?;
println!("Tracking {} repos with {} records", repos, records);
println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
println!("Firehose cursor: {:?}", cursors.firehose);
```
## Example: Syncing standard.site Records with Schema Generation and Validation
The repository includes a complete example demonstrating how to sync and validate ATProto records using `tapped` together with the [jacquard](https://crates.io/crates/jacquard) crates.
The jacquard ecosystem provides runtime validation of records against their lexicon constraints, and the ability to generate Rust structs from lexicon JSON files.
```
tapped/
├── tapped/ # The main tapped library
├── lexicons-example/ # Generated types from lexicon schemas
│ ├── lexicons/ # Source lexicon JSON files
│ │ ├── site.standard.publication.json
│ │ ├── site.standard.document.json
│ │ └── ...
│ └── src/ # Generated Rust code
└── standard-site-sync/ # Example binary using both packages
```
These files were generated like so:
```bash
# Install the code generator
cargo install jacquard-lexgen
jacquard-codegen -i lexicons-example/lexicons -o lexicons-example/src
```
This produces strongly-typed structs with built-in validation. For example, the `site.standard.publication` lexicon becomes:
```rust
use lexicons_example::site_standard::publication::Publication;
// Deserialize from JSON
let publication: Publication = serde_json::from_str(json)?;
// Validate against lexicon constraints (max length, grapheme limits, etc.)
publication.validate()?;
// Access typed fields
println!("Name: {}", publication.name.as_str());
println!("URL: {}", publication.url.as_str());
```
For more detail see `process_record_event` [in `main.rs`](https://tangled.org/octet-stream.net/tapped/blob/main/standard-site-sync/src/main.rs#L140-201).
## License
MIT