# SipFlow — SIP Signaling & RTP Recording Subsystem
## Overview
SipFlow is the SIP/RTP packet capture and recording subsystem in rustpbx. It supports two local storage engines (SQLite legacy and FlowDB LSM-tree) and a remote cluster mode, providing signaling replay, WAV export from RTP, and media quality statistics.
---
## Architecture
```
SIP / RTP data flow
│
▼
┌─────────────────┐ ┌──────────────────┐
│ callrecord/ │────▶│ SipFlowBackend │
│ sipflow.rs │ │ (trait) │
│ (MessageInsp.) │ └────────┬─────────┘
└─────────────────┘ │
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Local │ │ Local │ │ Remote │
│ (Sqlite) │ │ (FlowDB) │ │ (UDP+HTTP)│
└──────────┘ └──────────┘ └──────────┘
```
**Modules:**
| `sipflow/mod.rs` | `src/sipflow/mod.rs` | Core types: `SipFlowItem`, `SipFlowMsgType`, `SipFlowMediaStats`, `SipFlowQuery` |
| `sipflow/backend/mod.rs` | `src/sipflow/backend/mod.rs` | `SipFlowBackend` trait + `create_backend()` factory |
| `sipflow/backend/local.rs` | `src/sipflow/backend/local.rs` | Local SQLite backend (background worker + StorageManager) |
| `sipflow/backend/remote.rs` | `src/sipflow/backend/remote.rs` | Remote cluster backend (UDP + HTTP, jump consistent hash) |
| `sipflow/storage.rs` | `src/sipflow/storage.rs` | SQLite + raw-file storage manager |
| `sipflow/flowdb_backend.rs` | `src/sipflow/flowdb_backend.rs` | FlowDB backend implementation |
| `sipflow/flowdb_codec.rs` | `src/sipflow/flowdb_codec.rs` | FlowDB key/value encoding |
| `sipflow/protocol.rs` | `src/sipflow/protocol.rs` | Binary wire protocol for UDP transport |
| `sipflow/wav_utils.rs` | `src/sipflow/wav_utils.rs` | RTP → WAV generation with codec transcoding |
| `sipflow/rtp_stats.rs` | `src/sipflow/rtp_stats.rs` | RTP jitter/loss statistics |
| `sipflow/sdp_utils.rs` | `src/sipflow/sdp_utils.rs` | SDP parsing helpers |
| `bin/sipflow.rs` | `src/bin/sipflow.rs` | Standalone sipflow server binary |
| `callrecord/sipflow.rs` | `src/callrecord/sipflow.rs` | SIP message inspector (MessageInspector) with batch writer + object pool |
| `callrecord/sipflow_upload.rs` | `src/callrecord/sipflow_upload.rs` | S3/HTTP upload hooks |
| `console/handlers/sipflow.rs` | `src/console/handlers/sipflow.rs` | Console REST API endpoints |
---
## Data Flow Paths
### 1. Embedded Mode (callrecord integration)
```
SIP message / RTP sample
→ SipFlow::inspect_message() / SipFlow::record_rtp()
→ crossbeam channel (BATCH_SIZE=256, flush interval 50ms)
→ SipFlowBackend::record()
→ StorageManager / FlowDB
```
### 2. Standalone Server Mode
```
UDP packet → parse_packet() → mpsc channel → convert_packet_to_item()
→ SipFlowBackend::record()
→ SQLite (sipflow.db + data.raw) or FlowDB
→ HTTP API: /flow, /media, /health
```
### 3. Remote Cluster Mode (RemoteBackend)
```
rustpbx node → UDP send to sipflow cluster
→ Jump Consistent Hash node selection
→ Cluster node stores data + HTTP query API
```
---
## Storage Backend Comparison
| Storage layout | sipflow.db (metadata) + data.raw (payloads) | LSM-tree single directory |
| Write method | Batch INSERT + transaction commit | Per-record `write_batch_sync` |
| Compression | Per-packet zstd (level 3, ≥96 bytes) | Block-level LSM compression |
| TTL | No auto-expiry | Built-in TTL garbage collection |
| Subdirectory policy | None / Daily / Hourly | Single directory |
| Data isolation | JOIN via `call_meta` table | Key prefix scan (`sip:{id}:`, `rtp:{id}:`) |
### SQLite Storage Format
**sipflow.db schema:**
```sql
CREATE TABLE call_meta (
id INTEGER PRIMARY KEY AUTOINCREMENT,
callid TEXT UNIQUE NOT NULL
);
CREATE INDEX idx_callid ON call_meta(callid);
CREATE TABLE sip_msgs (
id INTEGER PRIMARY KEY,
call_id INTEGER NOT NULL,
src TEXT NOT NULL, dst TEXT NOT NULL,
timestamp INTEGER NOT NULL,
offset INTEGER NOT NULL, size INTEGER NOT NULL
);
CREATE INDEX idx_sip_call ON sip_msgs(call_id);
CREATE TABLE media_msgs (
id INTEGER PRIMARY KEY,
call_id INTEGER NOT NULL, leg INTEGER NOT NULL,
src TEXT NOT NULL DEFAULT '',
timestamp INTEGER NOT NULL,
offset INTEGER NOT NULL, size INTEGER NOT NULL
);
CREATE INDEX idx_media_call ON media_msgs(call_id);
CREATE INDEX idx_media_call_timestamp ON media_msgs(call_id, timestamp);
```
**data.raw record format:**
```
The payload is zstd-compressed when `orig_size ≥ 96` bytes. The magic bytes `0x28 0xB5 0x2F 0xFD` at the start of a decompressed payload identify zstd streams.
### FlowDB Storage Format
**Key design:**
```
SIP: sip:{call_id}:{counter} (20-digit counter ensures ordering)
RTP: rtp:{call_id}:{leg}:{counter}
```
**Value encoding:**
```
```
---
## Performance Benchmark
Results from `cargo run --release --example sipflow_bench -- --calls 50 --rtp-per-call 1000 --sip-per-call 20`
**Test environment:** 16C32G, NVMe SSD
### Write Throughput
| SQLite | 51,000 | 2.59s | 19,678 rec/s | 5,810 KB | 0.3 ms | 0.3 ms |
| FlowDB | 51,000 | 0.20s | 255,661 rec/s | 1,030 KB | 0.1 ms | 0.0 ms |
### Summary
| Write throughput | 19,678 rec/s | 255,661 rec/s | FlowDB 13× faster |
| Disk space | 5,810 KB | 1,030 KB | FlowDB 5.6× smaller |
| Flow query latency | 0.3 ms | 0.1 ms | FlowDB 4.9× faster |
| Media query latency | 0.3 ms | 0.0 ms | FlowDB faster |
> **Note:** FlowDB batches records in memory before performing a synchronous batch write, achieving **13× higher throughput** than SQLite while using 5.6× less disk space. The default `flush_count` (0) and `flush_interval_secs` (0) disable app-level buffering — each record is written to the engine immediately and is instantly queryable via FlowDB's LSM memtable. FlowDB's block-level LSM compression is more effective than per-packet zstd on small frames.
### Correctness Verification
- SIP flow count: SQLite=20 ✓, FlowDB=20 ✓ (MATCH)
- RTP packet sum: SQLite=1000 ✓, FlowDB=1000 ✓ (MATCH)
- Isolation (non-existent call): both return empty ✓
---
## Configuration
### TOML Examples
**Local FlowDB (default):**
```toml
[sipflow]
type = "local"
root = "/var/sipflow/data"
engine = "flowdb" # "flowdb" (default) or "sqlite"
ttl_secs = 86400 # optional TTL
memtable_size_mb = 64
block_cache_capacity_mb = 128
# Optional S3/HTTP upload hook
[sipflow.upload]
type = "s3"
vendor = "aws"
bucket = "sipflow-recordings"
region = "us-east-1"
access_key = "..."
secret_key = "..."
endpoint = "https://s3.amazonaws.com"
root = "recordings/"
```
**Local SQLite:**
```toml
[sipflow]
type = "local"
root = "/var/sipflow/data"
engine = "sqlite"
subdirs = "daily" # "none" / "daily" / "hourly" (SQLite only)
# flush_count = 0 # 0=immediate write, no app-level buffering
# flush_interval_secs = 0
id_cache_size = 8192
```
**Remote cluster:**
```toml
[sipflow]
type = "remote"
nodes = [
{ udp = "10.0.0.1:3000", http = "http://10.0.0.1:3001" },
{ udp = "10.0.0.2:3000", http = "http://10.0.0.2:3001" },
]
timeout_secs = 10
# Legacy single-node format:
# udp_addr = "127.0.0.1:3000"
# http_addr = "http://127.0.0.1:3001"
```
### Configuration Fields
| `type` | `"local"` / `"remote"` | required | Backend type |
| `root` | String | - | Data directory (local) |
| `engine` | `"flowdb"` / `"sqlite"` | `"flowdb"` | Storage engine (local) |
| `subdirs` | `"none"` / `"daily"` / `"hourly"` | `"daily"` | Directory partitioning (local SQLite) |
| `flush_count` | usize | 0 | Batch size before flush; 0 = immediate write (local) |
| `flush_interval_secs` | u64 | 0 | Max flush interval; 0 = no timer (local) |
| `id_cache_size` | usize | 8192 | CallID→ID LRU cache (local SQLite) |
| `ttl_secs` | Option\<u64\> | None | FlowDB TTL (local FlowDB) |
| `memtable_size_mb` | usize | 64 | FlowDB memtable size (local FlowDB) |
| `block_cache_capacity_mb` | usize | 128 | FlowDB block cache (local FlowDB) |
| `nodes` | Vec\<SipFlowClusterNode\> | [] | Cluster node list (remote) |
| `timeout_secs` | u64 | 10 | HTTP query timeout (remote) |
| `upload` | Option\<SipFlowUploadConfig\> | None | S3/HTTP upload hook |
---
## WAV Generation
The `wav_utils.rs` module reconstructs WAV audio from captured RTP packets:
- Automatic codec detection from SIP SDP negotiation
- Mixed-leg merge (both legs in one WAV) or per-leg download
- **Codec support:**
| 0 | PCMU | PCMU (tag=7) | No |
| 8 | PCMA | PCMA (tag=6) | No |
| 9 | G722 | L16 16kHz | Yes |
| 18 | G729 | L16 8kHz | Yes |
| 101 | telephone-event | DTMF | Regenerated |
| Dynamic | Opus | L16 48kHz | Yes |
---
## REST API
### Console API (embedded)
| GET | `/sipflow/settings` | Get current config |
| PUT | `/sipflow/settings` | Update config |
| GET | `/sipflow/flow/{call_id}` | Query SIP signaling flow |
| GET | `/sipflow/media/{call_id}` | Query media (WAV download) |
### Standalone Server API
| GET | `/health` | - | Health check |
| GET | `/flow` | `callid`, `start`, `end` | SIP flow (JSON) |
| GET | `/media` | `callid`, `start`, `end`, `stats` | Media WAV or stats |
- `start` / `end`: Unix timestamp (seconds), defaults to ±1 hour from now
- `stats=1`: Return RTP statistics instead of audio file
---
## Wire Protocol (UDP)
Used by standalone sipflow server's UDP receiver and RemoteBackend transport.
```
offset size field
0 1 version (currently 1)
1 1 msg_type (0=SIP, 1=RTP)
2 2 src_port (big endian)
4 16 src_ip (IPv6 address, IPv4 uses ::ffff:x.x.x.x)
20 2 dst_port
22 16 dst_ip
38 8 timestamp (microseconds)
46 4 call_id_len
50 N call_id (UTF-8)
50+N 1 leg_len (0=no leg, 1=leg in next byte, otherwise has leg)
51+N M leg value (when leg_len=1, 1 byte)
52+N K payload
```
---
## Standalone Server
```bash
# Start with FlowDB engine (default)
cargo run --release --bin sipflow -- \
--port 3000 --http-port 3001 \
--root /var/sipflow/data
# Start with SQLite engine
cargo run --release --bin sipflow -- \
--engine sqlite \
--port 3000 --http-port 3001 \
--root /var/sipflow/data \
--flush-count 1000 --flush-interval 5
# Start with FlowDB + TTL + large memtable
cargo run --release --bin sipflow -- \
--engine flowdb \
--port 3000 --http-port 3001 \
--root /var/sipflow/data \
--ttl-secs 86400 \
--memtable-size-mb 256 \
--block-cache-capacity-mb 512
```
### CLI Options
| `-a`, `--addr` | `0.0.0.0` | UDP bind address |
| `-p`, `--port` | `3000` | UDP receive port |
| `--http-port` | `3001` | HTTP query port |
| `-r`, `--root` | `./config/sipflow` | Data storage directory |
| `--engine` | `flowdb` | Storage engine: `flowdb` or `sqlite` |
| `--buffer-size` | `100000` | Channel buffer capacity |
| `--flush-count` | `1000` | Flush batch size (SQLite) |
| `--flush-interval` | `5` | Max flush interval in seconds (SQLite) |
| `--id-cache-size` | `8192` | CallID→ID cache size (SQLite) |
| `--ttl-secs` | none | FlowDB record TTL (0 = no expiry) |
| `--memtable-size-mb` | `64` | FlowDB memtable size |
| `--block-cache-capacity-mb` | `128` | FlowDB block cache capacity |
---
## RTP Quality Statistics
`query_media_stats()` returns per-(leg, source, SSRC) statistics:
| `leg` | i32 | Media leg (0=A, 1=B) |
| `src` | String | Source address |
| `packet_count` | usize | Received packets |
| `lost_packets` | u64 | Lost packets |
| `expected_packets` | u64 | Expected packets (= received + lost) |
| `loss_percent` | f64 | Packet loss percentage |
| `jitter_ms` | Option\<f64\> | Jitter in milliseconds |
| `ssrc` | Option\<u32\> | RTP SSRC |
| `payload_type` | Option\<u8\> | RTP payload type |
| `clock_rate` | Option\<u32\> | Clock rate |