rustvani 0.1.1

Voice AI framework for Rust — real-time speech pipelines with STT, LLM, TTS, and Dhara conversation flows
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
<p align="center">
  <img src="assets/Gemini_Generated_Image_3ppqyo3ppqyo3ppq.png" alt="rustvani hero" width="100%" />
</p>

# rustvani — वाणी

**High-performance voice agent pipeline framework in Rust.** A from-scratch port of [Pipecat](https://github.com/pipecat-ai/pipecat) designed for production voice AI deployments where latency, memory, and concurrency matter.

> *vānī* (वाणी) — voice, speech, language

```
User speaks → VAD → STT → LLM → TTS → User hears
              500ms   ~     ~     ~      ← end-to-end, not minutes
```

---

## Why rustvani?

If you've built voice agents with Pipecat (Python), you know the architecture is excellent — frame-based pipelines, clean processor abstractions, interrupt handling. But Python's async runtime, GIL contention, and memory overhead become real problems at scale.

rustvani keeps Pipecat's architecture and fixes the runtime:

| | Pipecat (Python) | rustvani (Rust) |
|---|---|---|
| Runtime | asyncio + threads | Tokio (work-stealing, zero-cost futures) |
| VAD inference | Threadpool executor | `spawn_blocking` on true OS threads |
| Memory per session | ~80–150 MB | ~8–15 MB |
| Frame dispatch | Dynamic dict lookups, vtable overhead | Monomorphized generics, enum dispatch |
| Cold start | 2–5s (interpreter + imports) | <100ms (static binary) |
| Deployment | Docker + Python env | Single static binary, ~15 MB |

This isn't a wrapper or binding — it's a ground-up Rust implementation that mirrors Pipecat's mental model so you can reason about both codebases interchangeably.

---

## Architecture

```
┌──────────────────────────────────────────────────────────────────┐
│  PipelineTask                                                    │
│                                                                  │
│  [TaskSource] → Transport.Input → VAD → STT → UserAgg →        │
│                 LLM → AssistantAgg → TTS → Transport.Output →   │
│                 [TaskSink]                                       │
│                                                                  │
│  Upstream  ◄────────────────────────────────────────────────     │
│  Downstream ────────────────────────────────────────────────►    │
└──────────────────────────────────────────────────────────────────┘
```

### Core concepts (1:1 with Pipecat)

**Frames** — Typed messages that flow through the pipeline. Three categories: System (lifecycle, VAD signals, audio input), Control (end, LLM response boundaries), and Data (transcriptions, LLM text, audio output, function calls). Every frame has a unique ID and optional sibling ID for broadcast deduplication.

**FrameProcessor** — The universal building block. Every component (VAD, STT, LLM, TTS, transport, pipeline itself) is a `FrameProcessor`. Each has two async queues: an input queue (system frames get priority) and a process queue (data/control frames). This two-queue design ensures lifecycle frames like `InterruptionFrame` are never blocked behind a backlog of audio chunks.

**Pipeline** — Chains processors into a linked list with source/sink sentinels. A Pipeline IS a FrameProcessor, so pipelines nest inside pipelines.

**PipelineTask** — Lifecycle wrapper. Manages setup, StartFrame injection, heartbeats, idle timeout, and graceful shutdown. Exposes callback hooks (`on_pipeline_started`, `on_pipeline_finished`, `on_idle_timeout`) and a `push_sender()` for external frame injection from your transport.

### Where rustvani diverges from Pipecat

**Enum-based frames instead of class hierarchy.** Python uses inheritance (`class TranscriptionFrame(DataFrame)`). Rust uses `Frame { inner: FrameInner }` where `FrameInner` is a three-level enum (`System | Control | Data`). Pattern matching replaces `isinstance()` checks — the compiler enforces exhaustiveness.

**Monomorphized adapters.** The `LLMAdapter` trait is used as a generic bound (`struct Handler<A: LLMAdapter>`), not a trait object. The compiler generates specialized code per provider — no vtable indirection on the hot path.

**`Arc<Mutex<>>` for model state.** Silero VAD and Piper TTS models need `&mut self` for `session.run()`. They're wrapped in `Arc<Mutex<SileroVadInner>>` so multiple pipeline instances can share model weights without duplicating them in memory. The mutex is held only during the blocking inference call inside `spawn_blocking`.

**Direct mode processors.** Source/sink sentinels skip the input/process task loops entirely (`enable_direct_mode: true`). Frames are processed inline — no queue overhead for infrastructure processors that just route.

---

## Modules

```
src/
├── adapters/          LLM provider adapters (OpenAI wire format)
│   └── schemas/       Provider-agnostic tool/function schemas
├── audio_process/     Noise suppression (RNNoise) + resampling (rubato)
├── context/           Shared LLMContext (messages, tools, tool_choice)
├── dhara/             Conversation flow engine (node-based state machine)
├── frames/            Frame types, FrameProcessor, priority queues
├── pipeline/          Pipeline assembly + PipelineTask lifecycle
├── processors/        LLM user/assistant aggregators
├── ravi/              RAVI protocol (real-time audio/video interface)
├── services/
│   ├── llm/           OpenAI + Sarvam LLM (SSE streaming, function calling)
│   ├── stt/           Sarvam STT (WebSocket streaming)
│   └── tts/           Sarvam TTS (WebSocket) + Piper TTS (local ONNX)
├── tools/             Built-in tools (Neon Postgres with pgvector)
├── transport/         WebSocket transport (axum) + base I/O
├── utils/             Sentence splitter, text preprocessor
└── vad/               Silero VAD (ONNX) + state machine
```

---

## Features

### Voice Activity Detection
- **Silero VAD** — two backends:
  - **Native** (default): pure Rust engine with zero ONNX Runtime dependency, ~17 MB footprint, 16kHz only. Loads weights from a flat binary (`silero_vad_16k.bin`).
  - **ONNX Runtime** (`ort` crate): supports 8kHz + 16kHz, same model as Pipecat
- 4-state machine: `Quiet → Starting → Speaking → Stopping → Quiet`
- Configurable confidence threshold, start/stop durations, minimum volume
- Volume calculation using dBFS approximation of EBU R128
- Inference on `spawn_blocking` — never blocks the Tokio executor

### Speech-to-Text
- **Sarvam AI** streaming WebSocket STT (`saaras:v3`)
- Supports transcription, translation, verbatim, transliteration, and codemix modes
- Multi-language: `ml-IN`, `hi-IN`, `en-IN`, auto-detect (`unknown`)
- Integrated **RNNoise** noise suppression (pure Rust via `nnnoiseless`)
- Transparent resampling if source rate ≠ 48 kHz (via `rubato`)

### Large Language Models
- **OpenAI-compatible** API with SSE streaming
- **Sarvam LLM** (`sarvam-m`, `sarvam-30b`) with optional CoT thinking mode
- Full function calling with re-invocation loop (model calls tool → execute → re-invoke)
- Configurable max tool rounds to prevent infinite loops
- Provider adapter system — add new providers by implementing `LLMAdapter`

### Text-to-Speech
- **Sarvam Bulbul** TTS (v2, v3-beta, v3) — WebSocket streaming with 25+ voices
- **Piper TTS** — fully local ONNX inference, zero network calls
  - espeak-ng phonemization → Piper ONNX → chunked PCM streaming
  - Multiple quality levels (Low/Medium/High) with different latency/quality tradeoffs
  - Shared model across pipeline instances via `Arc<Mutex<PiperModel>>`
- Sentence-aware text buffering with abbreviation detection (Mr., Dr., IPC., etc.)
- Indian numbering system preprocessing for TTS (10000 → 10,000)

### Function Calling & Tools
- **FunctionRegistry** with two handler types:
  - **Simple** — returns `String`, goes directly to LLM context
  - **Data** — returns `ToolCallOutput { summary, full_data }`, summary to LLM, raw data as a downstream frame for UI/logging
- **Neon Postgres built-in tool** (cacheable, lifecycle-managed):
  - `pg_schema` — cached schema introspection (tables, columns, PKs, vector columns)
  - `pg_query` — parameterized SELECT with result set caching
  - `pg_refine` — structured filter → parameterized SQL (no raw SQL from LLM)
  - `pg_vector_search` — pgvector similarity search with result set scoping
- **BuiltinTool trait** with full lifecycle: `on_start(cancel_token)``on_stop()``on_cancel()`

### Dhara — Conversation Flow Engine

> *dhara* (ധാര) — flow, stream

Node-based conversation flow system where each node defines its own system prompt, tools, and context strategy. Tool handlers return `TransitionResult::Stay` or `TransitionResult::Transition { next_node }` to control flow.

```rust
let mut dhara = DharaManager::new(context.clone(), registry.clone());

dhara.register_node("greeting", greeting_node, vec![
    ("check_weather", weather_handler),
    ("transfer_to_billing", billing_handler),  // → transitions to "billing" node
]);
dhara.register_node("billing", billing_node, vec![...]);

dhara.set_initial_node("greeting");

let mut llm = OpenAILLMHandler::with_shared_registry(config, registry);
llm.set_transition_hook(dhara.create_transition_hook());
```

### RAVI Protocol

> *ravi* (रवि) — the sun

Rust-native real-time protocol layer, spiritually equivalent to RTVI. Handles client handshake (`client-ready` / `bot-ready`), speaking state signals, transcription forwarding, LLM token streaming, function call events, and `send-text` injection.

### Transport
- **WebSocket** transport via `axum` + `tokio-tungstenite`
- Binary frames for PCM audio, text frames for RAVI protocol messages
- Configurable audio chunking (10ms multiples for smooth playback)
- Bot speaking state management with automatic `BotStartedSpeaking` / `BotStoppedSpeaking` broadcast

---

## Quick Start

```rust
use rustvani::*;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // 1. Create shared context
    let context = shared_context(Some(
        "You are a helpful voice assistant.".into()
    ));

    // 2. Build services
    let vad = SileroVad::new(16_000).expect("VAD model");

    let stt = SarvamSttHandler::new(SarvamSttConfig {
        api_key: std::env::var("SARVAM_API_KEY").unwrap(),
        ..Default::default()
    }).into_processor();

    let llm = OpenAILLMHandler::new(OpenAILLMConfig {
        api_key: std::env::var("OPENAI_API_KEY").unwrap(),
        model: "gpt-4.1".into(),
        ..Default::default()
    }).into_processor();

    let tts = SarvamTtsHandler::new(SarvamTtsConfig {
        api_key: std::env::var("SARVAM_API_KEY").unwrap(),
        model: "bulbul:v2".into(),
        ..Default::default()
    }).unwrap().into_processor();

    // 3. Build transport
    let transport = WebSocketTransport::new("ws", WebSocketParams {
        transport: TransportParams {
            audio_in_enabled: true,
            audio_in_sample_rate: Some(16_000),
            vad_analyzer: Some(Arc::new(vad)),
            ..Default::default()
        },
    });

    // 4. Build aggregators
    let user_agg = LLMUserAggregator::new(context.clone());
    let assistant_agg = LLMAssistantAggregator::new(context.clone());

    // 5. Assemble pipeline
    let pipeline = vec![
        transport.input(),
        stt,
        user_agg,
        llm,
        assistant_agg,
        tts,
        transport.output(),
    ];

    let task = PipelineTask::new(pipeline, PipelineParams {
        allow_interruptions: true,
        ..Default::default()
    });

    // 6. Run
    task.run(system_clock(), None).await.unwrap();
}
```

---

## Function Calling

```rust
use rustvani::services::llm::FunctionRegistry;
use rustvani::services::llm::function_registry::ToolCallOutput;
use rustvani::adapters::schemas::{FunctionSchema, ToolsSchema};
use serde_json::json;

// Define tool schemas
let tools = ToolsSchema::new(vec![
    FunctionSchema::new("get_weather", "Get weather for a city")
        .with_parameters(json!({
            "type": "object",
            "properties": {
                "city": { "type": "string" }
            },
            "required": ["city"]
        })),
]);

// Register handlers
let mut registry = FunctionRegistry::new();

// Simple handler — result goes directly to LLM context
registry.register("get_weather", |args: String| async move {
    let parsed: serde_json::Value = serde_json::from_str(&args).unwrap();
    let city = parsed["city"].as_str().unwrap_or("unknown");
    format!("Weather in {}: 28°C, partly cloudy", city)
});

// Data handler — summary to LLM, full data to UI
registry.register_data("search_cases", |args: String| async move {
    let rows = vec![json!({"id": 1, "title": "Case A"})];
    ToolCallOutput::with_data(
        format!("Found {} cases", rows.len()),
        json!(rows),
    )
});

// Wire into LLM handler
let llm = OpenAILLMHandler::with_registry(config, registry);
```

---

## Piper TTS (Local, No Network)

For deployments where network TTS adds unacceptable latency or isn't available:

```rust
use rustvani::services::tts::piper::*;

let tts = PiperTtsHandler::new(PiperTtsConfig {
    quality: PiperQuality::Medium,  // ~60 MB, 150–300ms/sentence
    model_dir: "./piper-models".into(),
    ..Default::default()
})?
.into_processor();

// Share model across multiple pipelines to save memory
let shared = tts_handler.shared_model();
let tts2 = PiperTtsHandler::with_shared_model(config, shared).into_processor();
```

Requires `espeak-ng` installed for phonemization (`apt install espeak-ng`).

---

## Built-in Postgres Tool

Attach a Neon Postgres database to your LLM with schema caching, result set management, structured filters (no raw SQL from the LLM), and pgvector similarity search:

```rust
use rustvani::tools::NeonPostgresTool;
use std::sync::Arc;

let pg = Arc::new(NeonPostgresTool::from_env()); // reads DATABASE_URL

let mut llm = OpenAILLMHandler::new(config);
llm.add_tool(pg.clone());

// Tool schemas are auto-registered. Schema is cached on StartFrame.
// pg_schema, pg_query, pg_refine, pg_vector_search are available to the LLM.
```

---

## Audio Processing

**RNNoise noise suppression** (pure Rust via `nnnoiseless`) is integrated into the STT pipeline. It buffers to 480-sample frames at 48 kHz, runs the denoiser, and resamples back to the source rate transparently.

```rust
use rustvani::audio_process::noisefilter::RNNoiseFilter;

let mut nf = RNNoiseFilter::new(16_000);  // auto-resamples 16k ↔ 48k
let clean = nf.filter(&noisy_pcm_i16);
let tail = nf.flush();  // drain at end of utterance
nf.reset();             // clean slate for next utterance
```

**Streaming resampler** (pure Rust via `rubato`) with quality presets from Quick (lowest latency) to VeryHigh (best quality).

---

## Deployment

rustvani compiles to a single static binary. No Python interpreter, no virtualenv, no dependency hell.

```dockerfile
FROM rust:1.82-slim AS builder
WORKDIR /app
COPY . .
RUN cargo build --release

FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates espeak-ng && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/your-bot /usr/local/bin/
COPY silero.onnx /app/
WORKDIR /app
CMD ["your-bot"]
```

**Fly.io scale-to-zero:**
```toml
[build]
  dockerfile = "Dockerfile"

[[services]]
  internal_port = 8080
  auto_stop_machines = true
  auto_start_machines = true
  min_machines_running = 0
```

---

## Project Status

rustvani is in active development. The core pipeline, frame system, and all listed services are functional and tested in production for a Kerala government voice agent deployment.

**Working:**
- Full pipeline lifecycle (start, interruption, cancel, end)
- Silero VAD (native Rust + ONNX) with state machine
- Sarvam STT/TTS/LLM integration
- OpenAI LLM with function calling + re-invocation loop
- Piper TTS (local ONNX)
- Dhara conversation flow manager
- RAVI protocol
- Neon Postgres tool with pgvector
- WebSocket transport (axum)
- RNNoise noise suppression
- Audio resampling

**Planned:**
- Anthropic / Gemini LLM adapters
- WebRTC transport
- Deepgram / Whisper STT
- ElevenLabs / PlayHT TTS
- Metrics and observability
- `crates.io` publish

---

## For Pipecat Developers

If you know Pipecat, you already know rustvani. Here's the mapping:

| Pipecat (Python) | rustvani (Rust) |
|---|---|
| `FrameProcessor` | `FrameProcessor` (same name, same semantics) |
| `Frame` subclasses | `Frame { inner: FrameInner }` enum |
| `Pipeline(processors)` | `Pipeline::new(processors)` |
| `PipelineTask` | `PipelineTask` (same lifecycle hooks) |
| `OpenAILLMService` | `OpenAILLMHandler` |
| `LLMUserResponseAggregator` | `LLMUserAggregator` |
| `LLMAssistantResponseAggregator` | `LLMAssistantAggregator` |
| `BaseTransport` | `BaseTransport` |
| `SileroVADAnalyzer` | `SileroVad` |
| `FunctionCallHandler` | `FunctionRegistry` |
| `FlowManager` | `DharaManager` |
| `RTVIProcessor` | `RaviProcessor` |
| `context.set_tools()` | `LLMContext::with_tools()` |
| `@transport.event_handler("on_client_connected")` | `task.add_on_pipeline_started(...)` |

The frame flow, interrupt semantics, aggregator logic, and pipeline nesting all work identically. If you've debugged a Pipecat bot, you can debug a rustvani bot.

---

## License

Rustvani runs on BSD2 [LICENSE](LICENSE).

Portions of this project are derived from [Pipecat](https://github.com/pipecat-ai/pipecat) by Daily and retain Pipecat's BSD-2-Clause license notice. See [THIRD_PARTY_NOTICES.md](THIRD_PARTY_NOTICES.md).

---

## Acknowledgements

rustvani wouldn't exist without [Pipecat](https://github.com/pipecat-ai/pipecat) by Daily. The architecture, frame taxonomy, aggregator patterns, and pipeline design are all derived from their excellent work. rustvani is a tribute to that design, rewritten for the constraints of production voice AI in Rust.

Built with [Sarvam AI](https://www.sarvam.ai/) for Indian language voice — STT, TTS, and LLM services that actually work for Malayalam, Hindi, and 10+ Indian languages.