# Live Queries
Live queries provide real-time data updates to clients using the `@live` directive. When underlying data changes, connected clients automatically receive updated results.
## Overview
Unlike traditional GraphQL subscriptions that push specific events, live queries automatically re-execute the query when relevant data mutations occur, sending the complete updated result to the client.
### Key Features
- **`@live` Directive**: Add to any query to make it "live"
- **WebSocket Delivery**: Real-time updates via `/graphql/live` endpoint
- **Invalidation-Based**: Mutations trigger query re-execution
- **Configurable Strategies**: Invalidation, polling, or hash-diff modes
- **Throttling**: Prevent flooding clients with too many updates
## Quick Start
### 1. Client: Send a Live Query
Connect to the WebSocket endpoint and subscribe with the `@live` directive:
```javascript
const ws = new WebSocket('ws://localhost:9000/graphql/live', 'graphql-transport-ws');
ws.onopen = () => {
// Initialize connection
ws.send(JSON.stringify({ type: 'connection_init' }));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'connection_ack') {
// Subscribe with @live query
ws.send(JSON.stringify({
id: 'users-live',
type: 'subscribe',
payload: {
query: `query @live {
users {
id
name
status
}
}`
}
}));
}
if (msg.type === 'next') {
console.log('Received update:', msg.payload.data);
}
};
```
### 2. Proto: Configure Live Query Support
Mark RPC methods as live query compatible:
```protobuf
service UserService {
rpc ListUsers(Empty) returns (UserList) {
option (graphql.schema) = {
type: QUERY
name: "users"
};
option (graphql.live_query) = {
enabled: true
strategy: INVALIDATION
triggers: ["User.create", "User.update", "User.delete"]
throttle_ms: 100
};
}
rpc CreateUser(CreateUserRequest) returns (User) {
option (graphql.schema) = {
type: MUTATION
name: "createUser"
};
// Mutations don't need live_query config - they trigger invalidation
}
}
```
### 3. Server: Trigger Invalidation
After mutations, trigger invalidation to notify live queries:
```rust
use grpc_graphql_gateway::{InvalidationEvent, LiveQueryStore};
// In your mutation handler
async fn create_user(&self, req: CreateUserRequest) -> Result<User, Status> {
// ... create user logic ...
// Notify live queries that User data changed
if let Some(store) = &self.live_query_store {
store.invalidate(InvalidationEvent::new("User", "create"));
}
Ok(user)
}
```
## Live Query Strategies
### Invalidation (Recommended)
Re-execute query only when relevant mutations occur:
```protobuf
option (graphql.live_query) = {
enabled: true
strategy: INVALIDATION
triggers: ["User.update", "User.delete"]
};
```
### Polling
Periodically re-execute query at fixed intervals:
```protobuf
option (graphql.live_query) = {
enabled: true
strategy: POLLING
poll_interval_ms: 5000 // Every 5 seconds
};
```
### Hash Diff
Only send updates if result actually changed:
```protobuf
option (graphql.live_query) = {
enabled: true
strategy: HASH_DIFF
poll_interval_ms: 1000
};
```
## Configuration Options
| `enabled` | bool | Enable live query for this operation |
| `strategy` | enum | `INVALIDATION`, `POLLING`, or `HASH_DIFF` |
| `triggers` | string[] | Invalidation event patterns (e.g., "User.update") |
| `throttle_ms` | uint32 | Minimum time between updates (default: 100ms) |
| `poll_interval_ms` | uint32 | Polling interval for POLLING/HASH_DIFF strategies |
| `ttl_seconds` | uint32 | Auto-expire subscription after N seconds |
## API Reference
### Public Functions
```rust
// Check if query contains @live directive
pub fn has_live_directive(query: &str) -> bool;
// Strip @live directive for execution
pub fn strip_live_directive(query: &str) -> String;
// Create a shared live query store
pub fn create_live_query_store() -> SharedLiveQueryStore;
// Create with custom config
pub fn create_live_query_store_with_config(config: LiveQueryConfig) -> SharedLiveQueryStore;
```
### LiveQueryStore Methods
```rust
impl LiveQueryStore {
// Register a new live query subscription
pub fn register(&self, query: ActiveLiveQuery, sender: Sender<LiveQueryUpdate>) -> Result<(), LiveQueryError>;
// Unregister a subscription
pub fn unregister(&self, subscription_id: &str) -> Option<ActiveLiveQuery>;
// Trigger invalidation for matching subscriptions
pub fn invalidate(&self, event: InvalidationEvent) -> usize;
// Get current statistics
pub fn stats(&self) -> LiveQueryStats;
}
```
### InvalidationEvent
```rust
// Create an invalidation event
let event = InvalidationEvent::new("User", "update");
// With specific entity ID
let event = InvalidationEvent::with_id("User", "update", "user-123");
```
## WebSocket Protocol
The `/graphql/live` endpoint uses the `graphql-transport-ws` protocol:
### Client → Server
| `connection_init` | Initialize connection |
| `subscribe` | Start a live query subscription |
| `complete` | End a subscription |
| `ping` | Keep-alive ping |
### Server → Client
| `connection_ack` | Connection accepted |
| `next` | Query result (initial or update) |
| `error` | Error occurred |
| `complete` | Subscription ended |
| `pong` | Keep-alive response |
## Example: Full CRUD with Live Updates
See the complete example at `examples/live_query/`:
```bash
# Run the example
cargo run --example live_query
# In another terminal, run the WebSocket test
node examples/live_query/test_ws.js
```
The test demonstrates:
1. Initial live query returning 3 users
2. Delete mutation removing a user
3. Re-query showing 2 users
4. Create mutation adding a new user
5. Final query showing updated user list
## Best Practices
1. **Use Specific Triggers**: Only subscribe to relevant entity types
2. **Set Appropriate Throttle**: Prevent overwhelming clients (100-500ms)
3. **Use TTL for Temporary Subscriptions**: Auto-cleanup inactive queries
4. **Prefer Invalidation over Polling**: More efficient for most use cases
5. **Handle Reconnection**: Clients should re-subscribe after disconnect
---
## Advanced Features
The live query system includes 4 advanced features for optimizing bandwidth, performance, and user experience.
### 1. Filtered Live Queries
Apply server-side filtering to live queries to receive only relevant updates.
#### Usage
```graphql
# Only receive updates for online users
query @live {
users(status: ONLINE) {
users { id name }
total_count
}
}
```
#### Implementation
```rust
use grpc_graphql_gateway::{parse_query_arguments, matches_filter};
// Parse filter from query
let args = parse_query_arguments("users(status: ONLINE) @live");
// → { "status": "ONLINE" }
// Check if entity matches filter
let user = json!({"id": "1", "status": "ONLINE", "name": "Alice"});
if matches_filter(&args, &user) {
// Include in live query results
}
```
#### Benefits
- **50-90% bandwidth reduction** for filtered datasets
- Natural GraphQL query syntax
- No client-side filtering needed
---
### 2. Field-Level Invalidation
Track which specific fields changed and communicate this to clients for surgical updates.
#### Response Format
```javascript
{
id: "sub-123",
data: { user: { id: "1", name: "Alice Smith", age: 31 } },
changed_fields: ["user.name", "user.age"], // ← Only these changed!
is_initial: false,
revision: 5
}
```
#### Implementation
```rust
use grpc_graphql_gateway::detect_field_changes;
let old_data = json!({"user": {"name": "Alice", "age": 30}});
let new_data = json!({"user": {"name": "Alice Smith", "age": 31}});
let changes = detect_field_changes(&old_data, &new_data, "", 0, 10);
// changes = [
// FieldChange { field_path: "user.name", old_value: "Alice", new_value: "Alice Smith" },
// FieldChange { field_path: "user.age", old_value: 30, new_value: 31 }
// ]
```
#### Client-Side Usage
```javascript
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'next' && msg.payload.changed_fields) {
// Only update changed fields in UI
msg.payload.changed_fields.forEach(field => {
updateFieldInDOM(field, msg.payload.data);
});
}
};
```
#### Benefits
- **30-70% bandwidth reduction** when few fields change
- **Surgical UI updates** - only re-render changed components
- Reduced client-side processing overhead
---
### 3. Batch Invalidation
Merge multiple rapid invalidation events into a single update to reduce network traffic.
#### Configuration
```rust
use grpc_graphql_gateway::BatchInvalidationConfig;
let config = BatchInvalidationConfig {
enabled: true,
debounce_ms: 50, // Wait 50ms before flushing
max_batch_size: 100, // Auto-flush at 100 events
max_wait_ms: 500, // Force flush after 500ms max
};
```
#### How It Works
```
Without batching:
━━━━━━━━━━━━━━━━━━━━━━━
Event 1 (0ms) → Update 1
Event 2 (10ms) → Update 2
Event 3 (20ms) → Update 3
Event 4 (30ms) → Update 4
Event 5 (40ms) → Update 5
━━━━━━━━━━━━━━━━━━━━━━━
Result: 5 updates sent
With batching (100ms throttle):
━━━━━━━━━━━━━━━━━━━━━━━
Events 1-5 (0-40ms)
↓ (wait 100ms)
Single merged update
━━━━━━━━━━━━━━━━━━━━━━━
Result: 1 update sent
```
#### Proto Configuration
```protobuf
option (graphql.live_query) = {
enabled: true
strategy: INVALIDATION
throttle_ms: 100 // ← Enables batching
triggers: ["User.create", "User.update"]
};
```
#### Benefits
- **70-95% fewer network requests** during high-frequency updates
- Lower client processing overhead
- Better performance for rapidly changing data
---
### 4. Client-Side Caching Hints
Send cache control directives to help clients optimize caching based on data volatility.
#### Response Format
```javascript
{
id: "sub-123",
data: { user: { name: "Alice" } },
cache_control: {
max_age: 300, // Cache for 5 minutes
must_revalidate: true,
etag: "abc123def456" // For efficient revalidation
}
}
```
#### Implementation
```rust
use grpc_graphql_gateway::{generate_cache_control, DataVolatility};
// Generate cache control based on data type
let cache = generate_cache_control(
DataVolatility::Low, // User profiles change infrequently
Some("etag-user-123".to_string())
);
// Result:
// CacheControl {
// max_age: 300, // 5 minutes
// must_revalidate: true,
// etag: Some("etag-user-123")
// }
```
#### Data Volatility Levels
| `VeryHigh` | 0s (no cache) | Stock prices, real-time metrics |
| `High` | 5s | User online status, live counts |
| `Medium` | 30s | Notification counts, activity feeds |
| `Low` | 5 minutes | User profiles, post content |
| `VeryLow` | 1 hour | Settings, configuration data |
#### Client-Side Implementation
```javascript
const cache = new Map();
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'next' && msg.payload.cache_control) {
const { max_age, etag } = msg.payload.cache_control;
// Store in cache with expiration
cache.set(msg.id, {
data: msg.payload.data,
etag: etag,
expires: Date.now() + (max_age * 1000)
});
}
};
```
#### Benefits
- **40-80% reduced server load** through client caching
- Faster perceived performance
- Automatic cache invalidation on updates
---
## Advanced Features API Reference
### Functions
```rust
// Filter Support - Feature #1
pub fn parse_query_arguments(query: &str) -> HashMap<String, String>;
pub fn matches_filter(filter: &HashMap<String, String>, data: &Value) -> bool;
// Field-Level Changes - Feature #2
pub fn detect_field_changes(
old: &Value,
new: &Value,
path: &str,
depth: usize,
max_depth: usize
) -> Vec<FieldChange>;
// Cache Control - Feature #4
pub fn generate_cache_control(
volatility: DataVolatility,
etag: Option<String>
) -> CacheControl;
```
### Types
```rust
// Cache Control
pub struct CacheControl {
pub max_age: u32,
pub public: bool,
pub must_revalidate: bool,
pub etag: Option<String>,
}
// Field Change
pub struct FieldChange {
pub field_path: String,
pub old_value: Option<Value>,
pub new_value: Value,
}
// Batch Configuration
pub struct BatchInvalidationConfig {
pub enabled: bool,
pub debounce_ms: u64,
pub max_batch_size: usize,
pub max_wait_ms: u64,
}
// Data Volatility
pub enum DataVolatility {
VeryHigh, // Changes multiple times per second
High, // Changes every few seconds
Medium, // Changes every minute
Low, // Changes hourly
VeryLow, // Changes daily or less
}
```
### Enhanced LiveQueryUpdate
```rust
pub struct LiveQueryUpdate {
pub id: String,
pub data: serde_json::Value,
pub is_initial: bool,
pub revision: u64,
// Advanced features (all optional)
pub cache_control: Option<CacheControl>,
pub changed_fields: Option<Vec<String>>,
pub batched: Option<bool>,
pub timestamp: Option<u64>,
}
```
---
## Performance Comparison
### Real-World Scenario
**Setup:** Live dashboard with 1000 users, 10 fields each, 60 updates/minute
| Users sent | 1000 | 100 (filtered) | 90% reduction |
| Fields/user | 10 | 2 (changed only) | 80% reduction |
| Updates/min | 60 | 10 (batched) | 83% reduction |
| Cache hits | 0% | 50% | 50% less load |
| **Total data/min** | **~2.3 MB** | **~23 KB** | **99% reduction** |
---
## Complete Example
A comprehensive example demonstrating all 4 features is available:
```bash
# Run the server
cargo run --example live_query
# Test all advanced features
cd examples/live_query
node test_advanced_features.js
```
For detailed documentation and examples, see:
- [`examples/live_query/ADVANCED_FEATURES.md`](../../examples/live_query/ADVANCED_FEATURES.md)
- [`examples/live_query/VISUAL_GUIDE.md`](../../examples/live_query/VISUAL_GUIDE.md)
---
## Migration Guide
### Adding Filtered Queries
**Before:**
```javascript
query @live {
users {
users { id name status }
}
}
```
**After:**
```javascript
query @live {
users(status: ONLINE) { // ← Add filter
users { id name status }
}
}
```
### Using Field-Level Updates
**Before:**
```javascript
if (msg.type === 'next') {
// Update entire component
updateUserComponent(msg.payload.data.user);
}
```
**After:**
```javascript
if (msg.type === 'next') {
if (msg.payload.changed_fields) {
// Update only changed fields
msg.payload.changed_fields.forEach(field => {
updateField(field, msg.payload.data);
});
} else {
// Initial load
updateUserComponent(msg.payload.data.user);
}
}
```
### Enabling Batching
Simply increase the throttle in your proto config:
```protobuf
option (graphql.live_query) = {
throttle_ms: 100 // ← Increase from 0 to enable batching
};
```
---
## Troubleshooting
### Filtered queries not working?
- Verify filter syntax: `key: value` format (e.g., `status: ONLINE`)
- Filters are case-sensitive
- Check that entity data contains the filter fields
### Too many updates still?
- Increase `throttle_ms` for more aggressive batching
- Add more specific filters to reduce result set
- Review your invalidation triggers
### Cache not working?
- Ensure client respects `max_age` header
- Check that `cache_control` is present in response
- Verify ETag handling on client side
### Changed fields not showing?
- Feature requires `throttle_ms > 0`
- Check that data actually changes between updates
- Ensure client is checking `changed_fields` property