# tapped
A Rust wrapper library for the [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) ATProto sync utility.
`tapped` provides an idiomatic async Rust interface for spawning and communicating with a `tap` subprocess, making it easy to build applications that sync data from the ATProto network.
## Features
- Spawn and manage `tap` subprocesses with graceful shutdown
- Strongly-typed configuration for all tap envvars
- Strongly-typed async Rust functions covering all of tap's HTTP API endpoints
- WebSocket-based event channel with automatic acknowledgment
## Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
tapped = "0.1"
```
You'll also need the `tap` binary. Build it from the [indigo repository](https://github.com/bluesky-social/indigo):
```bash
cd cmd/tap && go build
```
`tapped` has been most recently tested against:
```
tap version v0.0.0-20260114211028-207c9d49d0de-rev-207c9d4
```
## Quick Start
```rust
use tapped::{TapHandle, TapConfig, Event};
#[tokio::main]
async fn main() -> tapped::Result<()> {
let config = TapConfig::builder()
.database_url("sqlite://tap.db")
.collection_filter("app.bsky.feed.post")
.build();
// Spawn tap and connect
let handle = TapHandle::spawn_default(config).await?;
// Subscribe to events
let mut channel = handle.channel().await?;
while let Ok(received) = channel.recv().await {
match &received.event {
Event::Record(record) => {
println!("[{}] {}/{}",
record.action,
record.collection,
record.rkey
);
}
Event::Identity(identity) => {
println!("Identity: {} -> {}", identity.did, identity.handle);
}
}
// Event is auto-acknowledged when `received` is dropped
}
Ok(())
}
```
## Usage Patterns
### Connect to Existing Instance
If you have a tap instance already running:
```rust
use tapped::TapClient;
let client = TapClient::new("http://localhost:2480")?;
client.health().await?;
```
### Spawn with Custom Binary Path
```rust
use tapped::{TapProcess, TapConfig};
let config = TapConfig::builder()
.database_url("sqlite://my-app.db")
.build();
let mut process = TapProcess::spawn("/path/to/tap", config).await?;
let client = process.client();
// Use the client...
process.shutdown().await?;
```
### Using TapHandle (Recommended)
`TapHandle` combines process management and client access:
```rust
use tapped::{TapHandle, TapConfig};
let config = TapConfig::builder()
.database_url("sqlite://app.db")
.full_network(false)
.build();
let handle = TapHandle::spawn_default(config).await?;
// TapHandle derefs to TapClient, so you can call client methods directly
handle.health().await?;
let count = handle.repo_count().await?;
println!("Tracking {} repos", count);
```
### Configuration Options
```rust
use tapped::{TapConfig, LogLevel};
use std::time::Duration;
let config = TapConfig::builder()
// Database
.database_url("sqlite://tap.db")
.max_db_conns(10)
// Network
.bind("127.0.0.1:2480")
.relay_url("wss://bsky.network")
.plc_url("https://plc.directory")
// Filtering
.signal_collection("app.bsky.feed.post")
.collection_filter("app.bsky.feed.post")
.collection_filter("app.bsky.feed.like")
.full_network(false)
// Performance
.firehose_parallelism(10)
.resync_parallelism(5)
.outbox_parallelism(10)
.outbox_capacity(10000)
// Timeouts
.repo_fetch_timeout(Duration::from_secs(30))
.startup_timeout(Duration::from_secs(60))
.shutdown_timeout(Duration::from_secs(10))
// Logging
.log_level(LogLevel::Info)
.build();
```
### Working with Events
Events are automatically acknowledged when dropped:
```rust
use tapped::{Event, RecordAction};
let mut channel = client.channel().await?;
while let Ok(received) = channel.recv().await {
match &received.event {
Event::Record(record) => {
match record.action {
RecordAction::Create => {
if let Some(ref rec) = record.record {
// Access the raw JSON
println!("Type: {:?}", rec.record_type());
// Or deserialize to a specific type
// let post: MyPostType = rec.deserialize_as()?;
}
}
RecordAction::Update => { /* ... */ }
RecordAction::Delete => { /* ... */ }
_ => {}
}
}
Event::Identity(identity) => {
println!("{} is now @{}", identity.did, identity.handle);
}
}
// Ack sent automatically here when `received` goes out of scope
}
```
### Managing Repositories
```rust
// Add repos to track
client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;
// Remove repos
client.remove_repos(&["did:plc:abc123"]).await?;
// Get info about a specific repo
let info = client.repo_info("did:plc:def456").await?;
println!("State: {:?}, Records: {}", info.state, info.records);
// Resolve a DID to its document
let doc = client.resolve_did("did:plc:def456").await?;
println!("Handles: {:?}", doc.also_known_as);
```
### Checking Stats
```rust
let repos = client.repo_count().await?;
let records = client.record_count().await?;
let outbox = client.outbox_buffer().await?;
let resync = client.resync_buffer().await?;
let cursors = client.cursors().await?;
println!("Tracking {} repos with {} records", repos, records);
println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
println!("Firehose cursor: {:?}", cursors.firehose);
```
## Error Handling
All operations return `tapped::Result<T>`, which uses the `tapped::Error` enum:
```rust
use tapped::Error;
match client.health().await {
Ok(()) => println!("Healthy!"),
Err(Error::Http(e)) => println!("HTTP error: {}", e),
Err(Error::Timeout) => println!("Request timed out"),
Err(e) => println!("Other error: {}", e),
}
```
## Example: Syncing Standard Site Records
See [examples/standard_site_sync.rs](examples/standard_site_sync.rs) for a complete example that syncs `site.standard.publication` and `site.standard.document` records to local files.
```bash
cargo run --example standard_site_sync
```
## License
MIT