gRPC Source
The gRPC Source provides a high-performance gRPC service endpoint for submitting data change events to Drasi. It exposes a Protocol Buffer-based API that supports both single event submission and streaming for high-throughput scenarios.
Overview
The gRPC Source is a server-side component that implements the drasi.v1.SourceService gRPC service. External systems can push data changes (insert, update, delete operations) to Drasi using efficient binary serialization and bidirectional streaming via HTTP/2.
Key Capabilities
- Dual submission modes: Unary RPC for single events or client streaming for bulk ingestion
- High performance: Binary Protocol Buffers over HTTP/2 with multiplexing
- Type safety: Strongly-typed messages defined in protobuf schemas
- Health monitoring: Built-in health check endpoint
- Bootstrap support: Extensible API for initial data snapshots (future enhancement)
- Error handling: Detailed error responses with validation messages
- Lifecycle management: Graceful startup/shutdown with status tracking
- Flexible dispatch: Configurable channel-based or broadcast dispatch modes
Use Cases
- IoT data ingestion: Streaming sensor data from edge devices
- Microservice integration: Real-time event streaming between services
- ETL pipelines: Bulk data loading with streaming support
- Change data capture: Pushing database changes via gRPC clients
- System integration: Language-agnostic integration using protobuf
- High-volume scenarios: Efficient handling of thousands of events per second
Configuration
Builder Pattern (Recommended)
The builder pattern provides the most ergonomic and type-safe way to construct a gRPC source:
use GrpcSource;
use DispatchMode;
// Basic configuration
let source = builder
.with_host
.with_port
.build?;
// Advanced configuration with custom dispatch settings
let source = builder
.with_host
.with_port
.with_timeout_ms
.with_dispatch_mode
.with_dispatch_buffer_capacity
.with_auto_start
.build?;
// With bootstrap provider
let source = builder
.with_host
.with_port
.with_bootstrap_provider
.build?;
Config Struct Approach
For programmatic configuration or deserialization from external sources:
use ;
// Using config struct
let config = GrpcSourceConfig ;
let source = new?;
// With custom dispatch settings
let source = with_dispatch?;
Direct Integration with DrasiLib
use DrasiLib;
use GrpcSource;
let drasi = builder
.with_id
.build
.await?;
let source = builder
.with_host
.with_port
.build?;
drasi.add_source.await?;
drasi.start_source.await?;
Configuration Options
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
id |
Unique identifier for the source instance | String |
Any non-empty string | (Required) |
host |
Host address to bind the gRPC server to | String |
Valid hostname or IP address (e.g., "0.0.0.0", "127.0.0.1") | "0.0.0.0" |
port |
Port number for the gRPC server | u16 |
1-65535 | 50051 |
endpoint |
Optional custom service endpoint path | Option<String> |
Any valid path string | None |
timeout_ms |
Request timeout in milliseconds | u64 |
Positive integer (milliseconds) | 5000 |
dispatch_mode |
Event dispatch strategy | Option<DispatchMode> |
Channel (isolated, backpressure) or Broadcast (shared, no backpressure) |
Channel |
dispatch_buffer_capacity |
Buffer size for dispatch channel | Option<usize> |
Positive integer | 1000 |
bootstrap_provider |
Provider for initial data snapshots | Option<Box<dyn BootstrapProvider>> |
Any type implementing BootstrapProvider |
None |
auto_start |
Whether to start automatically when added to DrasiLib | bool |
true, false |
true |
Configuration Notes
- Auto-start: When
auto_start=true(default), the source starts immediately if added to a running DrasiLib instance. Whenauto_start=false, start manually withdrasi.start_source("source-id"). - Host binding: Use
"0.0.0.0"to accept connections from any network interface, or"127.0.0.1"for localhost only - Port: Must be available (not in use by another service)
- Timeout: Applies to request processing; higher values for slow network conditions
- Dispatch mode:
Channel: Each subscriber gets an isolated channel with backpressure (prevents message loss)Broadcast: Single shared channel across subscribers (faster but may drop messages under load)
- Buffer capacity: Higher values handle bursts better but use more memory
Input Schema
The gRPC Source accepts events in Protocol Buffer format as defined in proto/drasi/v1/source.proto and proto/drasi/v1/common.proto.
Core Message Types
SourceChange (Top-level event message)
message SourceChange {
ChangeType type = 1; // INSERT, UPDATE, or DELETE
oneof change {
Element element = 2; // For INSERT/UPDATE operations
ElementMetadata metadata = 3; // For DELETE operations
}
google.protobuf.Timestamp timestamp = 4;
string source_id = 5;
}
enum ChangeType {
CHANGE_TYPE_UNSPECIFIED = 0;
CHANGE_TYPE_INSERT = 1;
CHANGE_TYPE_UPDATE = 2;
CHANGE_TYPE_DELETE = 3;
}
Element (Node or Relation)
message Element {
oneof element {
Node node = 1;
Relation relation = 2;
}
}
Node Element
message Node {
ElementMetadata metadata = 1;
google.protobuf.Struct properties = 2;
}
Fields:
metadata: Required metadata containing reference, labels, and effective timestampproperties: Key-value properties usinggoogle.protobuf.Struct(flexible schema)
Relation Element
message Relation {
ElementMetadata metadata = 1;
ElementReference in_node = 2; // Target node (relationship points to)
ElementReference out_node = 3; // Source node (relationship comes from)
google.protobuf.Struct properties = 4;
}
Fields:
metadata: Required metadata for the relationin_node: Reference to the target node (where the arrow points)out_node: Reference to the source node (where the arrow originates)properties: Key-value properties of the relationship
Direction semantics: (out_node)-[relation]->(in_node)
ElementMetadata
message ElementMetadata {
ElementReference reference = 1;
repeated string labels = 2;
uint64 effective_from = 3; // Unix timestamp in nanoseconds
}
Fields:
reference: Unique identifier (source_id + element_id)labels: Classification labels for pattern matching (e.g., ["User", "Customer"])effective_from: Timestamp in nanoseconds since Unix epoch
ElementReference
message ElementReference {
string source_id = 1;
string element_id = 2;
}
Fields:
source_id: Identifies which source owns this elementelement_id: Unique identifier within the source
Service Methods
The gRPC Source implements the drasi.v1.SourceService:
1. SubmitEvent (Unary RPC)
Submit a single event.
rpc SubmitEvent(SubmitEventRequest) returns (SubmitEventResponse);
message SubmitEventRequest {
SourceChange event = 1;
}
message SubmitEventResponse {
bool success = 1;
string message = 2;
string error = 3;
string event_id = 4; // UUID assigned to the event
}
2. StreamEvents (Client Streaming RPC)
Stream multiple events for bulk processing.
rpc StreamEvents(stream SourceChange) returns (stream StreamEventResponse);
message StreamEventResponse {
bool success = 1;
string message = 2;
string error = 3;
uint64 events_processed = 4;
}
Behavior:
- Accepts a stream of
SourceChangemessages - Returns periodic progress updates (every 100 events)
- Returns final count when stream completes
- Individual event errors don't stop the stream
3. RequestBootstrap (Server Streaming RPC)
Request initial data snapshot (extensible for future use).
rpc RequestBootstrap(BootstrapRequest) returns (stream BootstrapResponse);
message BootstrapRequest {
string query_id = 1;
repeated string node_labels = 2;
repeated string relation_labels = 3;
}
message BootstrapResponse {
repeated Element elements = 1;
uint32 total_count = 2;
}
Current behavior: Returns empty stream (placeholder for future implementation)
4. HealthCheck (Unary RPC)
Check service health status.
rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResponse);
message HealthCheckResponse {
enum Status {
STATUS_UNSPECIFIED = 0;
STATUS_HEALTHY = 1;
STATUS_DEGRADED = 2;
STATUS_UNHEALTHY = 3;
}
Status status = 1;
string message = 2;
string version = 3; // Package version
}
Property Value Types
The google.protobuf.Struct supports the following value types:
| Protobuf Type | Drasi ElementValue | Notes |
|---|---|---|
null_value |
Null |
Null/missing value |
bool_value |
Bool |
Boolean true/false |
number_value (integer) |
Integer |
Whole numbers without fractional part |
number_value (float) |
Float |
Numbers with fractional part |
string_value |
String |
UTF-8 text |
list_value |
String (JSON) |
Arrays converted to JSON string |
struct_value |
String (JSON) |
Objects converted to JSON string |
Note: Complex types (lists, structs) are serialized as JSON strings for storage in Drasi's graph model.
Usage Examples
Example 1: Basic Server Setup
use GrpcSource;
use DrasiLib;
async
Example 2: Python Client - Submit Single Node
# Connect to gRPC source
=
=
# Create properties
=
=
=
= 30
= True
# Create metadata
=
# Create node
=
# Create source change
=
# Submit event
=
=
Example 3: Python Client - Stream Events
=
=
"""Generate 1000 user nodes"""
=
= f
=
= % 2 == 0
=
=
yield
# Stream events
=
Example 4: Go Client - Insert Relation
package main
import (
"context"
"log"
"time"
pb "your-module/drasi/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/structpb"
)
func main()
Example 5: Rust Client - Update and Delete
use Channel;
use ;
use HashMap;
use ;
async
Example 6: Testing with grpcurl
# Install grpcurl
# List available services
# Health check
# Submit a node insert event
Integration with Drasi Queries
Events submitted through the gRPC source flow into Drasi's continuous query engine where they can be matched against Cypher patterns.
Label Matching
// Submit a node with labels ["User", "Premium"]
// This will match Cypher patterns like:
// MATCH (u:User) ...
// MATCH (p:Premium) ...
// MATCH (u:User:Premium) ...
Property Filtering
// Submit a node with property active=true
// This will be filtered by Cypher WHERE clauses:
// MATCH (u:User) WHERE u.active = true
// MATCH (u:User) WHERE u.age > 18
Relationship Traversal
// Submit a relation (user_001)-[:FOLLOWS]->(user_002)
// This enables Cypher patterns:
// MATCH (a:User)-[:FOLLOWS]->(b:User)
// MATCH (a)-[f:FOLLOWS]->(b) WHERE f.strength > 0.8
Advanced Topics
Dispatch Modes
Channel Mode (Default):
- Each subscriber gets an isolated channel
- Provides backpressure (blocks if subscriber is slow)
- Guarantees zero message loss
- Higher memory usage under load
Broadcast Mode:
- Single shared channel for all subscribers
- No backpressure (continues if subscriber is slow)
- May drop messages if buffer fills
- Lower latency and memory usage
let source = builder
.with_dispatch_mode
.with_dispatch_buffer_capacity
.build?;
Bootstrap Providers
Enable initial data snapshots for queries:
use BootstrapProvider;
let source = builder
.with_bootstrap_provider
.build?;
Profiling and Performance
The gRPC source includes built-in profiling metadata:
// Automatically tracked:
// - source_send_ns: Timestamp when event enters the source
// - Propagated through the query pipeline
// - Available in reactions for end-to-end latency measurement
Error Handling
Validation Errors:
- Missing required fields (metadata, in_node, out_node)
- Invalid change types
- Malformed protobuf messages
Response contains:
SubmitEventResponse
Streaming behavior:
- Individual errors don't stop the stream
- Valid events continue processing
- Periodic responses include error counts
Graceful Shutdown
// The source handles shutdown signals gracefully:
// 1. Stops accepting new connections
// 2. Completes in-flight requests
// 3. Closes channels
// 4. Releases resources
drasi.stop_source.await?;
Generating Client Code
Python
# Install tools
# Generate code
Go
# Install tools
# Generate code
Rust
Add to your build.rs:
JavaScript/TypeScript
# Install tools
# Generate code
Performance Guidelines
When to Use SubmitEvent (Unary)
- Low to moderate event rates (< 100 events/second)
- Need immediate per-event confirmation
- Interactive applications with user feedback
- Critical events requiring acknowledgment
- Testing and debugging
When to Use StreamEvents (Streaming)
- High-volume ingestion (> 1000 events/second)
- Bulk data imports or migrations
- ETL pipelines with batching
- IoT sensor streams
- Log aggregation
- When network efficiency matters
Best Practices
- Connection reuse: Create one gRPC channel and reuse for all requests
- Stream for bulk: Use streaming for bulk operations (10x+ faster)
- Buffer sizing: Tune
dispatch_buffer_capacitybased on throughput - Connection pooling: For high concurrency scenarios
- Keep-alive: Configure gRPC keep-alive for long-lived connections
- Batch client-side: Accumulate events before streaming
- Monitor metrics: Track event processing rates and latency
Performance Benchmarks (Approximate)
| Scenario | Events/sec | Notes |
|---|---|---|
| Unary RPC (single event) | 500-1000 | Round-trip per event |
| Streaming (batched) | 10,000-50,000 | Depends on event size |
| Streaming (small events) | 100,000+ | Minimal properties |
Comparison with HTTP Source
| Feature | gRPC Source | HTTP Source |
|---|---|---|
| Protocol | HTTP/2 + Protocol Buffers | HTTP/1.1 + JSON |
| Type Safety | Strongly typed (protobuf) | JSON schema validation |
| Performance | Higher throughput, lower latency | Good for moderate loads |
| Streaming | Native bidirectional streaming | Batch POST endpoint |
| Message Size | Smaller (binary encoding) | Larger (text JSON) |
| Client Generation | Auto-generated from .proto files | Manual or OpenAPI codegen |
| Browser Support | Limited (requires grpc-web proxy) | Native browser support |
| Debugging | Requires gRPC tools (grpcurl, Postman) | Standard HTTP tools (curl) |
| Best For | High-volume, microservices, IoT | Web apps, simple integration |
Security Considerations
The gRPC source currently runs without authentication or encryption. For production deployments:
TLS/SSL
// Future enhancement: TLS configuration
// Configure server TLS certificates
// Enforce encrypted connections
Authentication
// Future enhancement: Authentication interceptors
// Implement token-based auth
// Add API key validation
Network Security
- Use firewall rules to restrict access
- Deploy behind a reverse proxy (Envoy, Nginx)
- Use Kubernetes network policies
- Implement rate limiting at proxy level
Monitoring
- Track request rates and patterns
- Alert on unusual traffic spikes
- Log failed authentication attempts
- Monitor resource usage
Troubleshooting
Connection Refused
Error: Connection refused
Solutions:
- Verify source is started:
drasi.start_source("my-grpc").await? - Check port availability:
lsof -i :50051 - Verify host binding (use "0.0.0.0" not "localhost" for external access)
Invalid Event Data
SubmitEventResponse { success: false, error: "Validation error: Node element missing required 'metadata' field" }
Solutions:
- Ensure all required fields are populated
- Check protobuf message structure
- Validate element_id is not empty
- Verify labels array is not empty
Slow Event Processing
Symptoms: High latency, timeouts
Solutions:
- Increase
dispatch_buffer_capacity - Use streaming instead of unary calls
- Check query complexity
- Monitor system resources
No Subscribers
[my-grpc-source] Failed to dispatch (no subscribers): No subscribers available
This is normal: Events are dropped if no queries are subscribed. Add a query:
let query = cypher
.query
.from_source
.build;
drasi.add_query.await?;
Developer Notes
Source Code Structure
src/lib.rs: Main source implementation and gRPC servicesrc/config.rs: Configuration structs and validationsrc/tests.rs: Unit testsbuild.rs: Protobuf compilationproto/drasi/v1/: Protobuf schema definitions
Testing
# Run unit tests
# Run with logging
RUST_LOG=debug
# Test specific module
Contributing
When modifying the gRPC source:
- Update protobuf schemas in
proto/drasi/v1/ - Regenerate code:
cargo build(runsbuild.rs) - Add tests for new functionality
- Update this README with examples
- Run
cargo clippyandcargo fmt
License
Licensed under the Apache License, Version 2.0. See the LICENSE file for details.
Related Components
- HTTP Source: REST API alternative for web applications
- PostgreSQL Source: CDC from PostgreSQL databases
- Application Source: Programmatic event submission in Rust
- Query Engine: Continuous Cypher query evaluation
- Reactions: Output handlers for query results