drasi-source-grpc 0.1.2

gRPC source plugin for Drasi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
# gRPC Source

The gRPC Source provides a high-performance gRPC service endpoint for submitting data change events to Drasi. It exposes a Protocol Buffer-based API that supports both single event submission and streaming for high-throughput scenarios.

## Overview

The gRPC Source is a server-side component that implements the `drasi.v1.SourceService` gRPC service. External systems can push data changes (insert, update, delete operations) to Drasi using efficient binary serialization and bidirectional streaming via HTTP/2.

### Key Capabilities

- **Dual submission modes**: Unary RPC for single events or client streaming for bulk ingestion
- **High performance**: Binary Protocol Buffers over HTTP/2 with multiplexing
- **Type safety**: Strongly-typed messages defined in protobuf schemas
- **Health monitoring**: Built-in health check endpoint
- **Bootstrap support**: Extensible API for initial data snapshots (future enhancement)
- **Error handling**: Detailed error responses with validation messages
- **Lifecycle management**: Graceful startup/shutdown with status tracking
- **Flexible dispatch**: Configurable channel-based or broadcast dispatch modes

### Use Cases

- **IoT data ingestion**: Streaming sensor data from edge devices
- **Microservice integration**: Real-time event streaming between services
- **ETL pipelines**: Bulk data loading with streaming support
- **Change data capture**: Pushing database changes via gRPC clients
- **System integration**: Language-agnostic integration using protobuf
- **High-volume scenarios**: Efficient handling of thousands of events per second

## Configuration

### Builder Pattern (Recommended)

The builder pattern provides the most ergonomic and type-safe way to construct a gRPC source:

```rust
use drasi_source_grpc::GrpcSource;
use drasi_lib::channels::DispatchMode;

// Basic configuration
let source = GrpcSource::builder("my-grpc-source")
    .with_host("0.0.0.0")
    .with_port(50051)
    .build()?;

// Advanced configuration with custom dispatch settings
let source = GrpcSource::builder("production-grpc")
    .with_host("0.0.0.0")
    .with_port(50051)
    .with_timeout_ms(10000)
    .with_dispatch_mode(DispatchMode::Channel)
    .with_dispatch_buffer_capacity(2000)
    .with_auto_start(true)
    .build()?;

// With bootstrap provider
let source = GrpcSource::builder("bootstrapped-grpc")
    .with_host("127.0.0.1")
    .with_port(50052)
    .with_bootstrap_provider(my_bootstrap_provider)
    .build()?;
```

### Config Struct Approach

For programmatic configuration or deserialization from external sources:

```rust
use drasi_source_grpc::{GrpcSource, GrpcSourceConfig};

// Using config struct
let config = GrpcSourceConfig {
    host: "0.0.0.0".to_string(),
    port: 50051,
    endpoint: None,
    timeout_ms: 5000,
};

let source = GrpcSource::new("my-grpc-source", config)?;

// With custom dispatch settings
let source = GrpcSource::with_dispatch(
    "my-grpc-source",
    config,
    Some(DispatchMode::Channel),
    Some(1500)
)?;
```

### Direct Integration with DrasiLib

```rust
use drasi_lib::DrasiLib;
use drasi_source_grpc::GrpcSource;

let drasi = DrasiLib::builder()
    .with_id("my-drasi-instance")
    .build()
    .await?;

let source = GrpcSource::builder("events-grpc")
    .with_host("0.0.0.0")
    .with_port(50051)
    .build()?;

drasi.add_source(source).await?;
drasi.start_source("events-grpc").await?;
```

## Configuration Options

| Name | Description | Data Type | Valid Values | Default |
|------|-------------|-----------|--------------|---------|
| `id` | Unique identifier for the source instance | `String` | Any non-empty string | **(Required)** |
| `host` | Host address to bind the gRPC server to | `String` | Valid hostname or IP address (e.g., "0.0.0.0", "127.0.0.1") | `"0.0.0.0"` |
| `port` | Port number for the gRPC server | `u16` | 1-65535 | `50051` |
| `endpoint` | Optional custom service endpoint path | `Option<String>` | Any valid path string | `None` |
| `timeout_ms` | Request timeout in milliseconds | `u64` | Positive integer (milliseconds) | `5000` |
| `dispatch_mode` | Event dispatch strategy | `Option<DispatchMode>` | `Channel` (isolated, backpressure) or `Broadcast` (shared, no backpressure) | `Channel` |
| `dispatch_buffer_capacity` | Buffer size for dispatch channel | `Option<usize>` | Positive integer | `1000` |
| `bootstrap_provider` | Provider for initial data snapshots | `Option<Box<dyn BootstrapProvider>>` | Any type implementing `BootstrapProvider` | `None` |
| `auto_start` | Whether to start automatically when added to DrasiLib | `bool` | `true`, `false` | `true` |

### Configuration Notes

- **Auto-start**: When `auto_start=true` (default), the source starts immediately if added to a running DrasiLib instance. When `auto_start=false`, start manually with `drasi.start_source("source-id")`.
- **Host binding**: Use `"0.0.0.0"` to accept connections from any network interface, or `"127.0.0.1"` for localhost only
- **Port**: Must be available (not in use by another service)
- **Timeout**: Applies to request processing; higher values for slow network conditions
- **Dispatch mode**:
  - `Channel`: Each subscriber gets an isolated channel with backpressure (prevents message loss)
  - `Broadcast`: Single shared channel across subscribers (faster but may drop messages under load)
- **Buffer capacity**: Higher values handle bursts better but use more memory

## Input Schema

The gRPC Source accepts events in Protocol Buffer format as defined in `proto/drasi/v1/source.proto` and `proto/drasi/v1/common.proto`.

### Core Message Types

#### SourceChange (Top-level event message)

```protobuf
message SourceChange {
    ChangeType type = 1;              // INSERT, UPDATE, or DELETE
    oneof change {
        Element element = 2;          // For INSERT/UPDATE operations
        ElementMetadata metadata = 3; // For DELETE operations
    }
    google.protobuf.Timestamp timestamp = 4;
    string source_id = 5;
}

enum ChangeType {
    CHANGE_TYPE_UNSPECIFIED = 0;
    CHANGE_TYPE_INSERT = 1;
    CHANGE_TYPE_UPDATE = 2;
    CHANGE_TYPE_DELETE = 3;
}
```

#### Element (Node or Relation)

```protobuf
message Element {
    oneof element {
        Node node = 1;
        Relation relation = 2;
    }
}
```

#### Node Element

```protobuf
message Node {
    ElementMetadata metadata = 1;
    google.protobuf.Struct properties = 2;
}
```

**Fields:**
- `metadata`: Required metadata containing reference, labels, and effective timestamp
- `properties`: Key-value properties using `google.protobuf.Struct` (flexible schema)

#### Relation Element

```protobuf
message Relation {
    ElementMetadata metadata = 1;
    ElementReference in_node = 2;    // Target node (relationship points to)
    ElementReference out_node = 3;   // Source node (relationship comes from)
    google.protobuf.Struct properties = 4;
}
```

**Fields:**
- `metadata`: Required metadata for the relation
- `in_node`: Reference to the target node (where the arrow points)
- `out_node`: Reference to the source node (where the arrow originates)
- `properties`: Key-value properties of the relationship

**Direction semantics:** `(out_node)-[relation]->(in_node)`

#### ElementMetadata

```protobuf
message ElementMetadata {
    ElementReference reference = 1;
    repeated string labels = 2;
    uint64 effective_from = 3;  // Unix timestamp in nanoseconds
}
```

**Fields:**
- `reference`: Unique identifier (source_id + element_id)
- `labels`: Classification labels for pattern matching (e.g., ["User", "Customer"])
- `effective_from`: Timestamp in nanoseconds since Unix epoch

#### ElementReference

```protobuf
message ElementReference {
    string source_id = 1;
    string element_id = 2;
}
```

**Fields:**
- `source_id`: Identifies which source owns this element
- `element_id`: Unique identifier within the source

### Service Methods

The gRPC Source implements the `drasi.v1.SourceService`:

#### 1. SubmitEvent (Unary RPC)

Submit a single event.

```protobuf
rpc SubmitEvent(SubmitEventRequest) returns (SubmitEventResponse);

message SubmitEventRequest {
    SourceChange event = 1;
}

message SubmitEventResponse {
    bool success = 1;
    string message = 2;
    string error = 3;
    string event_id = 4;  // UUID assigned to the event
}
```

#### 2. StreamEvents (Client Streaming RPC)

Stream multiple events for bulk processing.

```protobuf
rpc StreamEvents(stream SourceChange) returns (stream StreamEventResponse);

message StreamEventResponse {
    bool success = 1;
    string message = 2;
    string error = 3;
    uint64 events_processed = 4;
}
```

**Behavior:**
- Accepts a stream of `SourceChange` messages
- Returns periodic progress updates (every 100 events)
- Returns final count when stream completes
- Individual event errors don't stop the stream

#### 3. RequestBootstrap (Server Streaming RPC)

Request initial data snapshot (extensible for future use).

```protobuf
rpc RequestBootstrap(BootstrapRequest) returns (stream BootstrapResponse);

message BootstrapRequest {
    string query_id = 1;
    repeated string node_labels = 2;
    repeated string relation_labels = 3;
}

message BootstrapResponse {
    repeated Element elements = 1;
    uint32 total_count = 2;
}
```

**Current behavior:** Returns empty stream (placeholder for future implementation)

#### 4. HealthCheck (Unary RPC)

Check service health status.

```protobuf
rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResponse);

message HealthCheckResponse {
    enum Status {
        STATUS_UNSPECIFIED = 0;
        STATUS_HEALTHY = 1;
        STATUS_DEGRADED = 2;
        STATUS_UNHEALTHY = 3;
    }

    Status status = 1;
    string message = 2;
    string version = 3;  // Package version
}
```

### Property Value Types

The `google.protobuf.Struct` supports the following value types:

| Protobuf Type | Drasi ElementValue | Notes |
|---------------|-------------------|-------|
| `null_value` | `Null` | Null/missing value |
| `bool_value` | `Bool` | Boolean true/false |
| `number_value` (integer) | `Integer` | Whole numbers without fractional part |
| `number_value` (float) | `Float` | Numbers with fractional part |
| `string_value` | `String` | UTF-8 text |
| `list_value` | `String` (JSON) | Arrays converted to JSON string |
| `struct_value` | `String` (JSON) | Objects converted to JSON string |

**Note:** Complex types (lists, structs) are serialized as JSON strings for storage in Drasi's graph model.

## Usage Examples

### Example 1: Basic Server Setup

```rust
use drasi_source_grpc::GrpcSource;
use drasi_lib::DrasiLib;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Create Drasi instance
    let drasi = DrasiLib::builder()
        .with_id("my-instance")
        .build()
        .await?;

    // Create and start gRPC source
    let source = GrpcSource::builder("events")
        .with_host("0.0.0.0")
        .with_port(50051)
        .build()?;

    drasi.add_source(source).await?;
    drasi.start_source("events").await?;

    println!("gRPC source listening on 0.0.0.0:50051");

    // Keep server running
    tokio::signal::ctrl_c().await?;

    Ok(())
}
```

### Example 2: Python Client - Submit Single Node

```python
import grpc
from google.protobuf import struct_pb2
import drasi.v1.source_pb2 as source_pb2
import drasi.v1.source_pb2_grpc as source_pb2_grpc
import drasi.v1.common_pb2 as common_pb2
import time

# Connect to gRPC source
channel = grpc.insecure_channel('localhost:50051')
stub = source_pb2_grpc.SourceServiceStub(channel)

# Create properties
props = struct_pb2.Struct()
props['name'] = 'Alice'
props['email'] = 'alice@example.com'
props['age'] = 30
props['active'] = True

# Create metadata
metadata = common_pb2.ElementMetadata(
    reference=common_pb2.ElementReference(
        source_id="my-grpc-source",
        element_id="user_001"
    ),
    labels=["User", "Customer"],
    effective_from=int(time.time() * 1e9)  # Nanoseconds
)

# Create node
node = common_pb2.Node(
    metadata=metadata,
    properties=props
)

# Create source change
change = common_pb2.SourceChange(
    type=common_pb2.CHANGE_TYPE_INSERT,
    element=common_pb2.Element(node=node)
)

# Submit event
request = source_pb2.SubmitEventRequest(event=change)
response = stub.SubmitEvent(request)

print(f"Success: {response.success}")
print(f"Message: {response.message}")
print(f"Event ID: {response.event_id}")
```

### Example 3: Python Client - Stream Events

```python
import grpc
from google.protobuf import struct_pb2
import drasi.v1.source_pb2_grpc as source_pb2_grpc
import drasi.v1.common_pb2 as common_pb2
import time

channel = grpc.insecure_channel('localhost:50051')
stub = source_pb2_grpc.SourceServiceStub(channel)

def event_generator():
    """Generate 1000 user nodes"""
    for i in range(1000):
        props = struct_pb2.Struct()
        props['name'] = f'User {i}'
        props['index'] = i
        props['active'] = i % 2 == 0

        metadata = common_pb2.ElementMetadata(
            reference=common_pb2.ElementReference(
                source_id="my-grpc-source",
                element_id=f"user_{i:04d}"
            ),
            labels=["User"],
            effective_from=int(time.time() * 1e9)
        )

        node = common_pb2.Node(metadata=metadata, properties=props)

        yield common_pb2.SourceChange(
            type=common_pb2.CHANGE_TYPE_INSERT,
            element=common_pb2.Element(node=node)
        )

# Stream events
responses = stub.StreamEvents(event_generator())

for response in responses:
    print(f"Processed: {response.events_processed} events")
    if not response.success:
        print(f"Error: {response.error}")

print("Stream completed")
```

### Example 4: Go Client - Insert Relation

```go
package main

import (
    "context"
    "log"
    "time"

    pb "your-module/drasi/v1"
    "google.golang.org/grpc"
    "google.golang.org/protobuf/types/known/structpb"
)

func main() {
    // Connect to gRPC source
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewSourceServiceClient(conn)

    // Create relation properties
    properties, _ := structpb.NewStruct(map[string]interface{}{
        "since": "2024-01-01",
        "strength": 0.85,
    })

    // Create relation (Alice FOLLOWS Bob)
    event := &pb.SourceChange{
        Type: pb.ChangeType_CHANGE_TYPE_INSERT,
        Change: &pb.SourceChange_Element{
            Element: &pb.Element{
                Element: &pb.Element_Relation{
                    Relation: &pb.Relation{
                        Metadata: &pb.ElementMetadata{
                            Reference: &pb.ElementReference{
                                SourceId: "my-grpc-source",
                                ElementId: "follows_001",
                            },
                            Labels: []string{"FOLLOWS"},
                            EffectiveFrom: uint64(time.Now().UnixNano()),
                        },
                        OutNode: &pb.ElementReference{
                            SourceId: "my-grpc-source",
                            ElementId: "user_001",  // Alice
                        },
                        InNode: &pb.ElementReference{
                            SourceId: "my-grpc-source",
                            ElementId: "user_002",  // Bob
                        },
                        Properties: properties,
                    },
                },
            },
        },
    }

    // Submit event
    resp, err := client.SubmitEvent(context.Background(), &pb.SubmitEventRequest{
        Event: event,
    })

    if err != nil {
        log.Fatalf("Failed to submit: %v", err)
    }

    log.Printf("Success: %v, Message: %s", resp.Success, resp.Message)
}
```

### Example 5: Rust Client - Update and Delete

```rust
use tonic::transport::Channel;
use prost_types::{Struct, Value};
use std::collections::HashMap;

pub mod drasi {
    pub mod v1 {
        tonic::include_proto!("drasi.v1");
    }
}

use drasi::v1::{
    source_service_client::SourceServiceClient,
    SubmitEventRequest, SourceChange, Element, Node, ElementMetadata,
    ElementReference, ChangeType,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = SourceServiceClient::connect("http://localhost:50051").await?;

    // Update example
    let mut update_props = HashMap::new();
    update_props.insert(
        "email".to_string(),
        Value {
            kind: Some(prost_types::value::Kind::StringValue(
                "alice.new@example.com".to_string()
            )),
        },
    );
    update_props.insert(
        "verified".to_string(),
        Value {
            kind: Some(prost_types::value::Kind::BoolValue(true)),
        },
    );

    let update_event = SourceChange {
        r#type: ChangeType::Update as i32,
        change: Some(drasi::v1::source_change::Change::Element(Element {
            element: Some(drasi::v1::element::Element::Node(Node {
                metadata: Some(ElementMetadata {
                    reference: Some(ElementReference {
                        source_id: "my-grpc-source".to_string(),
                        element_id: "user_001".to_string(),
                    }),
                    labels: vec!["User".to_string(), "Verified".to_string()],
                    effective_from: chrono::Utc::now().timestamp_nanos() as u64,
                }),
                properties: Some(Struct { fields: update_props }),
            })),
        })),
        timestamp: None,
        source_id: String::new(),
    };

    let response = client.submit_event(SubmitEventRequest {
        event: Some(update_event),
    }).await?;

    println!("Update: {}", response.into_inner().message);

    // Delete example
    let delete_event = SourceChange {
        r#type: ChangeType::Delete as i32,
        change: Some(drasi::v1::source_change::Change::Metadata(ElementMetadata {
            reference: Some(ElementReference {
                source_id: "my-grpc-source".to_string(),
                element_id: "user_999".to_string(),
            }),
            labels: vec!["User".to_string()],
            effective_from: chrono::Utc::now().timestamp_nanos() as u64,
        })),
        timestamp: None,
        source_id: String::new(),
    };

    let response = client.submit_event(SubmitEventRequest {
        event: Some(delete_event),
    }).await?;

    println!("Delete: {}", response.into_inner().message);

    Ok(())
}
```

### Example 6: Testing with grpcurl

```bash
# Install grpcurl
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest

# List available services
grpcurl -plaintext localhost:50051 list

# Health check
grpcurl -plaintext localhost:50051 drasi.v1.SourceService/HealthCheck

# Submit a node insert event
grpcurl -plaintext -d '{
  "event": {
    "type": "CHANGE_TYPE_INSERT",
    "element": {
      "node": {
        "metadata": {
          "reference": {
            "sourceId": "my-grpc-source",
            "elementId": "test_001"
          },
          "labels": ["TestNode"],
          "effectiveFrom": "1234567890000000000"
        },
        "properties": {
          "name": "Test Item",
          "value": 42
        }
      }
    }
  }
}' localhost:50051 drasi.v1.SourceService/SubmitEvent
```

## Integration with Drasi Queries

Events submitted through the gRPC source flow into Drasi's continuous query engine where they can be matched against Cypher patterns.

### Label Matching

```rust
// Submit a node with labels ["User", "Premium"]
// This will match Cypher patterns like:
// MATCH (u:User) ...
// MATCH (p:Premium) ...
// MATCH (u:User:Premium) ...
```

### Property Filtering

```rust
// Submit a node with property active=true
// This will be filtered by Cypher WHERE clauses:
// MATCH (u:User) WHERE u.active = true
// MATCH (u:User) WHERE u.age > 18
```

### Relationship Traversal

```rust
// Submit a relation (user_001)-[:FOLLOWS]->(user_002)
// This enables Cypher patterns:
// MATCH (a:User)-[:FOLLOWS]->(b:User)
// MATCH (a)-[f:FOLLOWS]->(b) WHERE f.strength > 0.8
```

## Advanced Topics

### Dispatch Modes

**Channel Mode (Default):**
- Each subscriber gets an isolated channel
- Provides backpressure (blocks if subscriber is slow)
- Guarantees zero message loss
- Higher memory usage under load

**Broadcast Mode:**
- Single shared channel for all subscribers
- No backpressure (continues if subscriber is slow)
- May drop messages if buffer fills
- Lower latency and memory usage

```rust
let source = GrpcSource::builder("events")
    .with_dispatch_mode(DispatchMode::Broadcast)
    .with_dispatch_buffer_capacity(5000)
    .build()?;
```

### Bootstrap Providers

Enable initial data snapshots for queries:

```rust
use drasi_lib::bootstrap::BootstrapProvider;

struct MyBootstrapProvider {
    // ... your implementation
}

impl BootstrapProvider for MyBootstrapProvider {
    // ... implement trait methods
}

let source = GrpcSource::builder("events")
    .with_bootstrap_provider(MyBootstrapProvider::new())
    .build()?;
```

### Profiling and Performance

The gRPC source includes built-in profiling metadata:

```rust
// Automatically tracked:
// - source_send_ns: Timestamp when event enters the source
// - Propagated through the query pipeline
// - Available in reactions for end-to-end latency measurement
```

### Error Handling

**Validation Errors:**
- Missing required fields (metadata, in_node, out_node)
- Invalid change types
- Malformed protobuf messages

**Response contains:**
```rust
SubmitEventResponse {
    success: false,
    message: "Invalid event data",
    error: "Validation error: Node element missing required 'metadata' field",
    event_id: ""
}
```

**Streaming behavior:**
- Individual errors don't stop the stream
- Valid events continue processing
- Periodic responses include error counts

### Graceful Shutdown

```rust
// The source handles shutdown signals gracefully:
// 1. Stops accepting new connections
// 2. Completes in-flight requests
// 3. Closes channels
// 4. Releases resources

drasi.stop_source("events").await?;
```

## Generating Client Code

### Python

```bash
# Install tools
pip install grpcio-tools

# Generate code
python -m grpc_tools.protoc \
    -I./proto \
    --python_out=./generated \
    --grpc_python_out=./generated \
    proto/drasi/v1/source.proto \
    proto/drasi/v1/common.proto
```

### Go

```bash
# Install tools
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# Generate code
protoc \
    --go_out=./generated \
    --go-grpc_out=./generated \
    --go_opt=paths=source_relative \
    --go-grpc_opt=paths=source_relative \
    -I ./proto \
    proto/drasi/v1/source.proto \
    proto/drasi/v1/common.proto
```

### Rust

Add to your `build.rs`:

```rust
fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::configure()
        .build_server(false)
        .build_client(true)
        .compile(
            &[
                "proto/drasi/v1/source.proto",
                "proto/drasi/v1/common.proto",
            ],
            &["proto"],
        )?;
    Ok(())
}
```

### JavaScript/TypeScript

```bash
# Install tools
npm install -g grpc-tools grpc_tools_node_protoc_ts

# Generate code
grpc_tools_node_protoc \
    --js_out=import_style=commonjs,binary:./generated \
    --grpc_out=grpc_js:./generated \
    --plugin=protoc-gen-grpc=`which grpc_tools_node_protoc_plugin` \
    -I ./proto \
    proto/drasi/v1/source.proto \
    proto/drasi/v1/common.proto
```

## Performance Guidelines

### When to Use SubmitEvent (Unary)

- Low to moderate event rates (< 100 events/second)
- Need immediate per-event confirmation
- Interactive applications with user feedback
- Critical events requiring acknowledgment
- Testing and debugging

### When to Use StreamEvents (Streaming)

- High-volume ingestion (> 1000 events/second)
- Bulk data imports or migrations
- ETL pipelines with batching
- IoT sensor streams
- Log aggregation
- When network efficiency matters

### Best Practices

1. **Connection reuse**: Create one gRPC channel and reuse for all requests
2. **Stream for bulk**: Use streaming for bulk operations (10x+ faster)
3. **Buffer sizing**: Tune `dispatch_buffer_capacity` based on throughput
4. **Connection pooling**: For high concurrency scenarios
5. **Keep-alive**: Configure gRPC keep-alive for long-lived connections
6. **Batch client-side**: Accumulate events before streaming
7. **Monitor metrics**: Track event processing rates and latency

### Performance Benchmarks (Approximate)

| Scenario | Events/sec | Notes |
|----------|------------|-------|
| Unary RPC (single event) | 500-1000 | Round-trip per event |
| Streaming (batched) | 10,000-50,000 | Depends on event size |
| Streaming (small events) | 100,000+ | Minimal properties |

## Comparison with HTTP Source

| Feature | gRPC Source | HTTP Source |
|---------|------------|-------------|
| Protocol | HTTP/2 + Protocol Buffers | HTTP/1.1 + JSON |
| Type Safety | Strongly typed (protobuf) | JSON schema validation |
| Performance | Higher throughput, lower latency | Good for moderate loads |
| Streaming | Native bidirectional streaming | Batch POST endpoint |
| Message Size | Smaller (binary encoding) | Larger (text JSON) |
| Client Generation | Auto-generated from .proto files | Manual or OpenAPI codegen |
| Browser Support | Limited (requires grpc-web proxy) | Native browser support |
| Debugging | Requires gRPC tools (grpcurl, Postman) | Standard HTTP tools (curl) |
| Best For | High-volume, microservices, IoT | Web apps, simple integration |

## Security Considerations

The gRPC source currently runs without authentication or encryption. For production deployments:

### TLS/SSL

```rust
// Future enhancement: TLS configuration
// Configure server TLS certificates
// Enforce encrypted connections
```

### Authentication

```rust
// Future enhancement: Authentication interceptors
// Implement token-based auth
// Add API key validation
```

### Network Security

- Use firewall rules to restrict access
- Deploy behind a reverse proxy (Envoy, Nginx)
- Use Kubernetes network policies
- Implement rate limiting at proxy level

### Monitoring

- Track request rates and patterns
- Alert on unusual traffic spikes
- Log failed authentication attempts
- Monitor resource usage

## Troubleshooting

### Connection Refused

```
Error: Connection refused
```

**Solutions:**
- Verify source is started: `drasi.start_source("my-grpc").await?`
- Check port availability: `lsof -i :50051`
- Verify host binding (use "0.0.0.0" not "localhost" for external access)

### Invalid Event Data

```
SubmitEventResponse { success: false, error: "Validation error: Node element missing required 'metadata' field" }
```

**Solutions:**
- Ensure all required fields are populated
- Check protobuf message structure
- Validate element_id is not empty
- Verify labels array is not empty

### Slow Event Processing

**Symptoms:** High latency, timeouts

**Solutions:**
- Increase `dispatch_buffer_capacity`
- Use streaming instead of unary calls
- Check query complexity
- Monitor system resources

### No Subscribers

```
[my-grpc-source] Failed to dispatch (no subscribers): No subscribers available
```

**This is normal:** Events are dropped if no queries are subscribed. Add a query:

```rust
let query = Query::cypher("my-query")
    .query("MATCH (n) RETURN n")
    .from_source("my-grpc-source")
    .build();

drasi.add_query(query).await?;
```

## Developer Notes

### Source Code Structure

- `src/lib.rs`: Main source implementation and gRPC service
- `src/config.rs`: Configuration structs and validation
- `src/tests.rs`: Unit tests
- `build.rs`: Protobuf compilation
- `proto/drasi/v1/`: Protobuf schema definitions

### Testing

```bash
# Run unit tests
cargo test -p drasi-source-grpc

# Run with logging
RUST_LOG=debug cargo test -p drasi-source-grpc -- --nocapture

# Test specific module
cargo test -p drasi-source-grpc proto_conversion
```

### Contributing

When modifying the gRPC source:

1. Update protobuf schemas in `proto/drasi/v1/`
2. Regenerate code: `cargo build` (runs `build.rs`)
3. Add tests for new functionality
4. Update this README with examples
5. Run `cargo clippy` and `cargo fmt`

## License

Licensed under the Apache License, Version 2.0. See the LICENSE file for details.

## Related Components

- **HTTP Source**: REST API alternative for web applications
- **PostgreSQL Source**: CDC from PostgreSQL databases
- **Application Source**: Programmatic event submission in Rust
- **Query Engine**: Continuous Cypher query evaluation
- **Reactions**: Output handlers for query results