leasy 0.1.0

Distributed lease management library
Documentation
# Leasy

A generic distributed lease manager in Rust — storage agnostic, split-brain safe.

## Use Cases
- **Kinesis shard assignment** — replace KCL with your own lease coordination
- **Cron job deduplication** — only 1 worker runs a scheduled job
- **Leader election** — elect a single master node
- **Database partition processing** — coordinate bulk data jobs

## Project Structure

```
src/
├── lib.rs          # Public exports
├── config.rs       # LeaseConfig with tunable intervals
├── error.rs        # LeaseError enum
├── lease.rs        # Lease struct + helpers
├── manager.rs      # LeaseManager — core rebalance & renewal engine
├── storage.rs      # LeaseStorage trait (implement for any backend)
└── adapters/
    ├── mod.rs      # Feature-gated adapter modules
    └── dynamodb.rs # DynamoDB backend (conditional writes)
```

## Dependencies

Add these to your consuming project's `Cargo.toml`:

```toml
[dependencies]
leasy = { path = "../lib/Leasy", features = ["dynamodb"] }
tokio = { version = "1", features = ["full"] }
aws-sdk-dynamodb = "1"
aws-sdk-kinesis = "1"
aws-config = "1"
uuid = { version = "1", features = ["v4"] }
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
```

---

## DynamoDB Table Setup

Create a table with partition key `lease_key` (String). No sort key needed.

```bash
aws dynamodb create-table \
  --table-name kinesis-shard-leases \
  --attribute-definitions AttributeName=lease_key,AttributeType=S \
  --key-schema AttributeName=lease_key,KeyType=HASH \
  --billing-mode PAY_PER_REQUEST
```

---

## Example: Kinesis Shard Consumer (replacing KCL)

Each ECS Fargate task runs this. Leasy automatically distributes shards
across all running tasks, handles failover, and checkpoints progress.

```rust
use anyhow::Result;
use aws_config::BehaviorVersion;
use aws_sdk_kinesis::types::ShardIteratorType;
use leasy::{DynamoLeaseStore, LeaseConfig, LeaseManager};
use std::sync::Arc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();

    // ── AWS clients ──────────────────────────────────────────────────
    let aws_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let dynamo_client = aws_sdk_dynamodb::Client::new(&aws_config);
    let kinesis_client = aws_sdk_kinesis::Client::new(&aws_config);

    // ── Lease store + manager ────────────────────────────────────────
    let lease_duration_ms = 10_000; // 10s lease; with 3s renewal → 7s safety buffer
    let store = Arc::new(DynamoLeaseStore::new(
        dynamo_client,
        "kinesis-shard-leases", // DynamoDB table name (PK = lease_key)
        lease_duration_ms,
    ));

    // On ECS Fargate use the task ARN for a stable worker identity.
    // Falls back to a random UUID if not on ECS.
    let worker_id = std::env::var("ECS_TASK_ARN")
        .unwrap_or_else(|_| uuid::Uuid::new_v4().to_string());

    let config = LeaseConfig {
        lease_duration_ms: 10_000,   // must match store
        renewal_interval_ms: 3_000,  // renew every 3s
        rebalance_interval_ms: 5_000,// rebalance every 5s (+jitter)
        max_leases_per_worker: None, // no cap
    };

    let manager = Arc::new(LeaseManager::new(store, worker_id, config));

    // ── Discover shards and register as leases ───────────────────────
    let stream_name = "my-kinesis-stream";
    let mut shard_ids: Vec<String> = Vec::new();
    let mut next_token: Option<String> = None;

    loop {
        let mut req = kinesis_client.list_shards().stream_name(stream_name);
        if let Some(token) = next_token.take() {
            req = req.next_token(token);
        }
        let resp = req.send().await?;

        if let Some(shards) = resp.shards {
            for shard in &shards {
                shard_ids.push(shard.shard_id().to_string());
            }
        }

        next_token = resp.next_token().map(|s| s.to_string());
        if next_token.is_none() {
            break;
        }
    }

    shard_ids.sort(); // deterministic order across all workers
    tracing::info!("Discovered {} shards", shard_ids.len());

    for shard_id in &shard_ids {
        manager.ensure_lease(shard_id).await?;
    }

    // ── Start background renewal + rebalance loops ───────────────────
    manager.clone().start_background_tasks();

    // ── Processing loop ──────────────────────────────────────────────
    loop {
        let my_shards = manager.get_my_lease_keys().await?;
        tracing::info!("This worker owns {} shards", my_shards.len());

        for shard_id in &my_shards {
            // Resume from last checkpoint (sequence number) or start from TRIM_HORIZON
            let checkpoint = manager.get_checkpoint(shard_id).await?;

            let mut iter_req = kinesis_client
                .get_shard_iterator()
                .stream_name(stream_name)
                .shard_id(shard_id);

            iter_req = match &checkpoint {
                Some(seq) => iter_req
                    .shard_iterator_type(ShardIteratorType::AfterSequenceNumber)
                    .starting_sequence_number(seq),
                None => iter_req.shard_iterator_type(ShardIteratorType::TrimHorizon),
            };

            let iterator = iter_req.send().await?.shard_iterator;

            if let Some(iter) = iterator {
                let output = kinesis_client
                    .get_records()
                    .shard_iterator(&iter)
                    .limit(1000)
                    .send()
                    .await?;

                for record in &output.records {
                    let data = record.data().as_ref();
                    tracing::debug!(
                        "shard={} seq={} bytes={}",
                        shard_id,
                        record.sequence_number(),
                        data.len()
                    );

                    // ─── YOUR BUSINESS LOGIC HERE ───
                    // process_record(data).await?;
                }

                // Checkpoint the last sequence number we processed
                if let Some(last) = output.records.last() {
                    manager
                        .checkpoint(shard_id, last.sequence_number())
                        .await?;
                }
            }
        }

        sleep(Duration::from_millis(200)).await;
    }
}
```

---

## Architecture

### Lease Lifecycle
```
Create → Acquire → Renew → Release
                  Expire → Steal (by another worker)
```

### Split Brain Prevention
DynamoDB conditional writes on a monotonic `counter` field. If two workers
race to acquire/renew the same lease, exactly one succeeds — the other gets
`false` and backs off gracefully.

### Rebalance Algorithm
```
total_leases   = 10
active_workers = 3
target_per_worker = ceil(10/3) = 4

Worker A → 4 leases
Worker B → 3 leases
Worker C → 3 leases
```
Priority order: unowned leases first, then expired leases.
Jitter on the rebalance interval prevents thundering herd on DynamoDB.

### Failure Recovery
```
Worker crashes → stops renewing → lease expires_at passes →
next rebalance on surviving workers detects expiry →
steals the lease → processing resumes from last checkpoint
```