Skip to main content

drasi_source_grpc/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! gRPC Source Plugin for Drasi
17//!
18//! This plugin exposes a gRPC endpoint for receiving data change events. External systems
19//! can stream events to Drasi using the gRPC protocol, which provides efficient binary
20//! serialization and bidirectional streaming support.
21//!
22//! # Service Endpoints
23//!
24//! The gRPC source implements the following service methods:
25//!
26//! - **`submit_event`** - Submit a single event (unary RPC)
27//! - **`stream_events`** - Stream multiple events (client streaming RPC)
28//! - **`request_bootstrap`** - Request initial data for bootstrapping (server streaming RPC)
29//! - **`health_check`** - Check service health (unary RPC)
30//!
31//! # Protocol Buffer Format
32//!
33//! Events are submitted using the `SourceChange` protobuf message. See the
34//! `proto/drasi/v1/source.proto` file for the full schema.
35//!
36//! ## Insert/Update
37//!
38//! ```protobuf
39//! SourceChange {
40//!     type: INSERT or UPDATE
41//!     change: Element {
42//!         node: Node {
43//!             metadata: ElementMetadata { ... }
44//!             properties: Struct { ... }
45//!         }
46//!     }
47//! }
48//! ```
49//!
50//! ## Delete
51//!
52//! ```protobuf
53//! SourceChange {
54//!     type: DELETE
55//!     change: Metadata {
56//!         reference: ElementReference { ... }
57//!         labels: ["Label1", "Label2"]
58//!     }
59//! }
60//! ```
61//!
62//! # Configuration
63//!
64//! | Field | Type | Default | Description |
65//! |-------|------|---------|-------------|
66//! | `host` | string | `"0.0.0.0"` | Host address to bind to |
67//! | `port` | u16 | `50051` | Port to listen on |
68//! | `endpoint` | string | None | Optional custom endpoint path |
69//! | `timeout_ms` | u64 | `5000` | Request timeout in milliseconds |
70//!
71//! # Example Configuration (YAML)
72//!
73//! ```yaml
74//! source_type: grpc
75//! properties:
76//!   host: "0.0.0.0"
77//!   port: 50051
78//! ```
79//!
80//! # Usage Example
81//!
82//! ```rust,ignore
83//! use drasi_source_grpc::{GrpcSource, GrpcSourceConfig};
84//! use std::sync::Arc;
85//!
86//! let config = GrpcSourceConfig {
87//!     host: "0.0.0.0".to_string(),
88//!     port: 50051,
89//!     endpoint: None,
90//!     timeout_ms: 5000,
91//! };
92//!
93//! let source = Arc::new(GrpcSource::new("my-grpc-source", config)?);
94//! drasi.add_source(source).await?;
95//! ```
96//!
97//! # Client Example (using grpcurl)
98//!
99//! ```bash
100//! grpcurl -plaintext -d '{"event": {...}}' localhost:50051 drasi.v1.SourceService/SubmitEvent
101//! ```
102
103pub mod config;
104pub mod descriptor;
105
106#[cfg(test)]
107mod tests;
108pub use config::GrpcSourceConfig;
109
110use anyhow::Result;
111use async_trait::async_trait;
112use log::{debug, error, info};
113use std::collections::HashMap;
114use std::sync::Arc;
115use tokio::sync::RwLock;
116use tonic::{transport::Server, Request, Response, Status};
117
118use drasi_lib::channels::{DispatchMode, *};
119use drasi_lib::managers::{log_component_start, log_component_stop};
120use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
121use drasi_lib::Source;
122use tracing::Instrument;
123
124// Include generated protobuf code
125// Allow unwrap in generated proto code - tonic generates code with unwrap() for HTTP response building
126#[allow(clippy::unwrap_used)]
127pub mod proto {
128    tonic::include_proto!("drasi.v1");
129}
130
131use proto::{
132    source_service_server::{SourceService, SourceServiceServer},
133    BootstrapRequest as ProtoBootstrapRequest, BootstrapResponse, HealthCheckResponse,
134    SourceChange as ProtoSourceChange, StreamEventResponse, SubmitEventRequest,
135    SubmitEventResponse,
136};
137
138/// gRPC source that exposes a gRPC endpoint to receive SourceChangeEvents.
139///
140/// This source implements a gRPC service for receiving data change events from external
141/// systems. It supports both unary and streaming RPC methods for event submission.
142///
143/// # Fields
144///
145/// - `base`: Common source functionality (dispatchers, status, lifecycle)
146/// - `config`: gRPC-specific configuration (host, port, timeout)
147pub struct GrpcSource {
148    /// Base source implementation providing common functionality
149    base: SourceBase,
150    /// gRPC source configuration
151    config: GrpcSourceConfig,
152}
153
154impl GrpcSource {
155    /// Create a builder for GrpcSource
156    ///
157    /// # Example
158    ///
159    /// ```rust,ignore
160    /// use drasi_source_grpc::GrpcSource;
161    ///
162    /// let source = GrpcSource::builder("my-grpc-source")
163    ///     .with_host("0.0.0.0")
164    ///     .with_port(50051)
165    ///     .with_bootstrap_provider(my_provider)
166    ///     .build()?;
167    /// ```
168    pub fn builder(id: impl Into<String>) -> GrpcSourceBuilder {
169        GrpcSourceBuilder::new(id)
170    }
171
172    /// Create a new gRPC source.
173    ///
174    /// The event channel is automatically injected when the source is added
175    /// to DrasiLib via `add_source()`.
176    ///
177    /// # Arguments
178    ///
179    /// * `id` - Unique identifier for this source instance
180    /// * `config` - gRPC source configuration
181    ///
182    /// # Returns
183    ///
184    /// A new `GrpcSource` instance, or an error if construction fails.
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if the base source cannot be initialized.
189    ///
190    /// # Example
191    ///
192    /// ```rust,ignore
193    /// use drasi_source_grpc::{GrpcSource, GrpcSourceConfig};
194    ///
195    /// let config = GrpcSourceConfig {
196    ///     host: "0.0.0.0".to_string(),
197    ///     port: 50051,
198    ///     endpoint: None,
199    ///     timeout_ms: 5000,
200    /// };
201    ///
202    /// let source = GrpcSource::new("my-grpc-source", config)?;
203    /// ```
204    pub fn new(id: impl Into<String>, config: GrpcSourceConfig) -> Result<Self> {
205        let id = id.into();
206        let params = SourceBaseParams::new(id);
207        Ok(Self {
208            base: SourceBase::new(params)?,
209            config,
210        })
211    }
212
213    /// Create a new gRPC source with custom dispatch settings
214    ///
215    /// The event channel is automatically injected when the source is added
216    /// to DrasiLib via `add_source()`.
217    pub fn with_dispatch(
218        id: impl Into<String>,
219        config: GrpcSourceConfig,
220        dispatch_mode: Option<DispatchMode>,
221        dispatch_buffer_capacity: Option<usize>,
222    ) -> Result<Self> {
223        let id = id.into();
224        let mut params = SourceBaseParams::new(id);
225        if let Some(mode) = dispatch_mode {
226            params = params.with_dispatch_mode(mode);
227        }
228        if let Some(capacity) = dispatch_buffer_capacity {
229            params = params.with_dispatch_buffer_capacity(capacity);
230        }
231        Ok(Self {
232            base: SourceBase::new(params)?,
233            config,
234        })
235    }
236}
237
238#[async_trait]
239impl Source for GrpcSource {
240    fn id(&self) -> &str {
241        &self.base.id
242    }
243
244    fn type_name(&self) -> &str {
245        "grpc"
246    }
247
248    fn properties(&self) -> HashMap<String, serde_json::Value> {
249        use crate::descriptor::GrpcSourceConfigDto;
250        use drasi_plugin_sdk::ConfigValue;
251
252        let dto = GrpcSourceConfigDto {
253            host: ConfigValue::Static(self.config.host.clone()),
254            port: ConfigValue::Static(self.config.port),
255            endpoint: self
256                .config
257                .endpoint
258                .as_ref()
259                .map(|e| ConfigValue::Static(e.clone())),
260            timeout_ms: ConfigValue::Static(self.config.timeout_ms),
261        };
262
263        match serde_json::to_value(&dto) {
264            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
265            _ => HashMap::new(),
266        }
267    }
268
269    fn auto_start(&self) -> bool {
270        self.base.get_auto_start()
271    }
272
273    async fn start(&self) -> Result<()> {
274        log_component_start("gRPC Source", &self.base.id);
275
276        self.base
277            .set_status(
278                ComponentStatus::Starting,
279                Some("Starting gRPC source".to_string()),
280            )
281            .await;
282
283        // Get configuration
284        let host = self.config.host.clone();
285        let port = self.config.port;
286
287        let addr = format!("{host}:{port}").parse()?;
288
289        info!("gRPC source '{}' listening on {}", self.base.id, addr);
290
291        // Create shutdown channel
292        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
293        *self.base.shutdown_tx.write().await = Some(shutdown_tx);
294
295        // Get instance_id from context for log routing isolation
296        let instance_id = self
297            .base
298            .context()
299            .await
300            .map(|c| c.instance_id)
301            .unwrap_or_default();
302
303        // Create gRPC service
304        let service = GrpcSourceService {
305            source_id: self.base.id.clone(),
306            instance_id: instance_id.clone(),
307            dispatchers: self.base.dispatchers.clone(),
308        };
309
310        let svc = SourceServiceServer::new(service);
311
312        // Start the gRPC server
313        let source_id = self.base.id.clone();
314        let reporter = self.base.status_handle();
315
316        let source_id_for_span = source_id.clone();
317        let span = tracing::info_span!(
318            "grpc_source_server",
319            instance_id = %instance_id,
320            component_id = %source_id_for_span,
321            component_type = "source"
322        );
323        let task = tokio::spawn(
324            async move {
325                reporter
326                    .set_status(
327                        ComponentStatus::Running,
328                        Some(format!("gRPC source listening on {addr}")),
329                    )
330                    .await;
331
332                // Run the server with graceful shutdown
333                let server =
334                    Server::builder()
335                        .add_service(svc)
336                        .serve_with_shutdown(addr, async move {
337                            let _ = shutdown_rx.await;
338                            debug!("gRPC source received shutdown signal");
339                        });
340
341                if let Err(e) = server.await {
342                    error!("gRPC server error: {e}");
343                }
344
345                reporter.set_status(ComponentStatus::Stopped, None).await;
346            }
347            .instrument(span),
348        );
349
350        *self.base.task_handle.write().await = Some(task);
351        // Note: Running status is set inside the spawned task with the
352        // informative "listening on {addr}" message. No duplicate set here.
353
354        Ok(())
355    }
356
357    async fn stop(&self) -> Result<()> {
358        log_component_stop("gRPC Source", &self.base.id);
359        self.base.stop_common().await
360    }
361
362    async fn status(&self) -> ComponentStatus {
363        self.base.get_status().await
364    }
365
366    async fn subscribe(
367        &self,
368        settings: drasi_lib::config::SourceSubscriptionSettings,
369    ) -> Result<SubscriptionResponse> {
370        self.base.subscribe_with_bootstrap(&settings, "gRPC").await
371    }
372
373    fn as_any(&self) -> &dyn std::any::Any {
374        self
375    }
376
377    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
378        self.base.initialize(context).await;
379    }
380
381    async fn set_bootstrap_provider(
382        &self,
383        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
384    ) {
385        self.base.set_bootstrap_provider(provider).await;
386    }
387}
388
389/// gRPC service implementation for the SourceService RPC methods.
390///
391/// Handles incoming gRPC requests and dispatches events to registered subscribers.
392struct GrpcSourceService {
393    /// The source ID used for event attribution
394    source_id: String,
395    /// Instance ID for log routing isolation
396    instance_id: String,
397    /// Shared dispatchers for sending events to subscribers
398    dispatchers: Arc<
399        RwLock<
400            Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
401        >,
402    >,
403}
404
405#[tonic::async_trait]
406impl SourceService for GrpcSourceService {
407    async fn submit_event(
408        &self,
409        request: Request<SubmitEventRequest>,
410    ) -> Result<Response<SubmitEventResponse>, Status> {
411        let event_request = request.into_inner();
412
413        if let Some(proto_change) = event_request.event {
414            match convert_proto_to_source_change(&proto_change, &self.source_id) {
415                Ok(source_change) => {
416                    // Create profiling metadata with timestamps
417                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
418                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
419
420                    let wrapper = SourceEventWrapper::with_profiling(
421                        self.source_id.clone(),
422                        SourceEvent::Change(source_change),
423                        chrono::Utc::now(),
424                        profiling,
425                    );
426
427                    debug!("[{}] Processing gRPC event: {:?}", self.source_id, &wrapper);
428
429                    // Dispatch via helper
430                    if let Err(e) = SourceBase::dispatch_from_task(
431                        self.dispatchers.clone(),
432                        wrapper,
433                        &self.source_id,
434                    )
435                    .await
436                    {
437                        debug!(
438                            "[{}] Failed to dispatch (no subscribers): {}",
439                            self.source_id, e
440                        );
441                    }
442
443                    debug!("[{}] Successfully processed gRPC event", self.source_id);
444                    Ok(Response::new(SubmitEventResponse {
445                        success: true,
446                        message: "Event processed successfully".to_string(),
447                        error: String::new(),
448                        event_id: uuid::Uuid::new_v4().to_string(),
449                    }))
450                }
451                Err(e) => {
452                    error!("[{}] Invalid event data: {}", self.source_id, e);
453                    Ok(Response::new(SubmitEventResponse {
454                        success: false,
455                        message: "Invalid event data".to_string(),
456                        error: e.to_string(),
457                        event_id: String::new(),
458                    }))
459                }
460            }
461        } else {
462            Ok(Response::new(SubmitEventResponse {
463                success: false,
464                message: "No event provided".to_string(),
465                error: "Event is required".to_string(),
466                event_id: String::new(),
467            }))
468        }
469    }
470
471    type StreamEventsStream =
472        tokio_stream::wrappers::ReceiverStream<Result<StreamEventResponse, Status>>;
473
474    async fn stream_events(
475        &self,
476        request: Request<tonic::Streaming<ProtoSourceChange>>,
477    ) -> Result<Response<Self::StreamEventsStream>, Status> {
478        let mut stream = request.into_inner();
479        let source_id = self.source_id.clone();
480        let instance_id = self.instance_id.clone();
481        let dispatchers = self.dispatchers.clone();
482
483        let (tx, rx) = tokio::sync::mpsc::channel(128);
484
485        let source_id_for_span = source_id.clone();
486        let span = tracing::info_span!(
487            "grpc_stream_events",
488            instance_id = %instance_id,
489            component_id = %source_id_for_span,
490            component_type = "source"
491        );
492        tokio::spawn(
493            async move {
494                let mut events_processed = 0u64;
495
496                while let Ok(Some(proto_change)) = stream.message().await {
497                    match convert_proto_to_source_change(&proto_change, &source_id) {
498                        Ok(source_change) => {
499                            // Create profiling metadata with timestamps
500                            let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
501                            profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
502
503                            let wrapper = SourceEventWrapper::with_profiling(
504                                source_id.clone(),
505                                SourceEvent::Change(source_change),
506                                chrono::Utc::now(),
507                                profiling,
508                            );
509
510                            // Dispatch via helper
511                            if let Err(e) = SourceBase::dispatch_from_task(
512                                dispatchers.clone(),
513                                wrapper.clone(),
514                                &source_id,
515                            )
516                            .await
517                            {
518                                debug!("[{source_id}] Failed to dispatch (no subscribers): {e}");
519                            }
520
521                            events_processed += 1;
522
523                            // Send periodic updates
524                            if events_processed.is_multiple_of(100) {
525                                let _ = tx
526                                    .send(Ok(StreamEventResponse {
527                                        success: true,
528                                        message: format!("Processed {events_processed} events"),
529                                        error: String::new(),
530                                        events_processed,
531                                    }))
532                                    .await;
533                            }
534                        }
535                        Err(e) => {
536                            error!("[{source_id}] Invalid event data: {e}");
537                            let _ = tx
538                                .send(Ok(StreamEventResponse {
539                                    success: false,
540                                    message: "Invalid event data".to_string(),
541                                    error: e.to_string(),
542                                    events_processed,
543                                }))
544                                .await;
545                        }
546                    }
547                }
548
549                // Send final response
550                let _ = tx
551                    .send(Ok(StreamEventResponse {
552                        success: true,
553                        message: format!("Stream completed. Processed {events_processed} events"),
554                        error: String::new(),
555                        events_processed,
556                    }))
557                    .await;
558            }
559            .instrument(span),
560        );
561
562        Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
563            rx,
564        )))
565    }
566
567    type RequestBootstrapStream =
568        tokio_stream::wrappers::ReceiverStream<Result<BootstrapResponse, Status>>;
569
570    async fn request_bootstrap(
571        &self,
572        _request: Request<ProtoBootstrapRequest>,
573    ) -> Result<Response<Self::RequestBootstrapStream>, Status> {
574        // For now, return empty stream
575        // This could be extended to support bootstrap
576        let (tx, rx) = tokio::sync::mpsc::channel(1);
577
578        let instance_id = self.instance_id.clone();
579        let source_id_for_span = self.source_id.clone();
580        let span = tracing::info_span!(
581            "grpc_request_bootstrap",
582            instance_id = %instance_id,
583            component_id = %source_id_for_span,
584            component_type = "source"
585        );
586        tokio::spawn(
587            async move {
588                let _ = tx
589                    .send(Ok(BootstrapResponse {
590                        elements: vec![],
591                        total_count: 0,
592                    }))
593                    .await;
594            }
595            .instrument(span),
596        );
597
598        Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
599            rx,
600        )))
601    }
602
603    async fn health_check(
604        &self,
605        _request: Request<()>,
606    ) -> Result<Response<HealthCheckResponse>, Status> {
607        Ok(Response::new(HealthCheckResponse {
608            status: proto::health_check_response::Status::Healthy as i32,
609            message: "gRPC source is healthy".to_string(),
610            version: env!("CARGO_PKG_VERSION").to_string(),
611        }))
612    }
613}
614
615/// Convert protobuf SourceChange to Drasi Core SourceChange.
616///
617/// # Arguments
618///
619/// * `proto_change` - The protobuf source change message
620/// * `source_id` - Source ID to use in element references
621///
622/// # Returns
623///
624/// A Drasi Core `SourceChange` or an error if conversion fails.
625///
626/// # Errors
627///
628/// Returns an error if:
629/// - The change type is invalid
630/// - Required fields are missing
631/// - Element data cannot be converted
632fn convert_proto_to_source_change(
633    proto_change: &ProtoSourceChange,
634    source_id: &str,
635) -> Result<drasi_core::models::SourceChange> {
636    use drasi_core::models::SourceChange;
637    use proto::source_change::Change;
638
639    let change_type = proto::ChangeType::try_from(proto_change.r#type)
640        .map_err(|_| anyhow::anyhow!("Invalid change type"))?;
641
642    match (change_type, &proto_change.change) {
643        (
644            proto::ChangeType::Insert | proto::ChangeType::Update,
645            Some(Change::Element(proto_element)),
646        ) => {
647            let element = convert_proto_element_to_core(proto_element, source_id)?;
648
649            if change_type == proto::ChangeType::Insert {
650                Ok(SourceChange::Insert { element })
651            } else {
652                Ok(SourceChange::Update { element })
653            }
654        }
655        (proto::ChangeType::Delete, Some(Change::Metadata(proto_metadata))) => {
656            let metadata = convert_proto_metadata_to_core(proto_metadata, source_id)?;
657            Ok(SourceChange::Delete { metadata })
658        }
659        _ => Err(anyhow::anyhow!("Invalid change type or missing data")),
660    }
661}
662
663/// Convert protobuf Element to Drasi Core Element.
664///
665/// Handles both Node and Relation element types.
666fn convert_proto_element_to_core(
667    proto_element: &proto::Element,
668    source_id: &str,
669) -> Result<drasi_core::models::Element> {
670    use drasi_core::models::{Element, ElementReference};
671    use proto::element::Element as ProtoElementType;
672
673    match &proto_element.element {
674        Some(ProtoElementType::Node(node)) => {
675            let metadata = node
676                .metadata
677                .as_ref()
678                .ok_or_else(|| anyhow::anyhow!(
679                    "Validation error: Node element missing required 'metadata' field in gRPC message. \
680                     Ensure the gRPC client sends complete node data."
681                ))?;
682
683            let metadata = convert_proto_metadata_to_core(metadata, source_id)?;
684            let properties = convert_proto_properties(&node.properties)?;
685
686            Ok(Element::Node {
687                metadata,
688                properties,
689            })
690        }
691        Some(ProtoElementType::Relation(relation)) => {
692            let metadata = relation
693                .metadata
694                .as_ref()
695                .ok_or_else(|| anyhow::anyhow!(
696                    "Validation error: Relation element missing required 'metadata' field in gRPC message. \
697                     Ensure the gRPC client sends complete relation data."
698                ))?;
699
700            let metadata = convert_proto_metadata_to_core(metadata, source_id)?;
701            let properties = convert_proto_properties(&relation.properties)?;
702
703            let in_node = relation.in_node.as_ref().ok_or_else(|| {
704                anyhow::anyhow!(
705                    "Validation error: Relation missing required 'in_node' field. \
706                     Relations must specify both source and target nodes."
707                )
708            })?;
709            let out_node = relation.out_node.as_ref().ok_or_else(|| {
710                anyhow::anyhow!(
711                    "Validation error: Relation missing required 'out_node' field. \
712                     Relations must specify both source and target nodes."
713                )
714            })?;
715
716            Ok(Element::Relation {
717                metadata,
718                properties,
719                in_node: ElementReference {
720                    source_id: Arc::from(in_node.source_id.as_str()),
721                    element_id: Arc::from(in_node.element_id.as_str()),
722                },
723                out_node: ElementReference {
724                    source_id: Arc::from(out_node.source_id.as_str()),
725                    element_id: Arc::from(out_node.element_id.as_str()),
726                },
727            })
728        }
729        None => Err(anyhow::anyhow!("Element type not specified")),
730    }
731}
732
733/// Convert protobuf ElementMetadata to Drasi Core ElementMetadata
734fn convert_proto_metadata_to_core(
735    proto_metadata: &proto::ElementMetadata,
736    source_id: &str,
737) -> Result<drasi_core::models::ElementMetadata> {
738    use drasi_core::models::{ElementMetadata, ElementReference};
739
740    let reference = proto_metadata
741        .reference
742        .as_ref()
743        .ok_or_else(|| anyhow::anyhow!("Metadata missing reference"))?;
744
745    Ok(ElementMetadata {
746        reference: ElementReference {
747            source_id: Arc::from(source_id),
748            element_id: Arc::from(reference.element_id.as_str()),
749        },
750        labels: Arc::from(
751            proto_metadata
752                .labels
753                .iter()
754                .map(|s| Arc::from(s.as_str()))
755                .collect::<Vec<_>>(),
756        ),
757        effective_from: proto_metadata.effective_from / 1_000_000, // Convert nanoseconds to milliseconds
758    })
759}
760
761/// Convert protobuf Struct to ElementPropertyMap
762fn convert_proto_properties(
763    props: &Option<prost_types::Struct>,
764) -> Result<drasi_core::models::ElementPropertyMap> {
765    use drasi_core::models::ElementPropertyMap;
766
767    let mut properties = ElementPropertyMap::new();
768
769    if let Some(struct_props) = props {
770        for (key, value) in &struct_props.fields {
771            properties.insert(key, convert_proto_value_to_element_value(value)?);
772        }
773    }
774
775    Ok(properties)
776}
777
778/// Convert protobuf Value to ElementValue
779fn convert_proto_value_to_element_value(
780    value: &prost_types::Value,
781) -> Result<drasi_core::models::ElementValue> {
782    use drasi_core::models::ElementValue;
783    use ordered_float::OrderedFloat;
784    use prost_types::value::Kind;
785
786    match &value.kind {
787        Some(Kind::NullValue(_)) => Ok(ElementValue::Null),
788        Some(Kind::BoolValue(b)) => Ok(ElementValue::Bool(*b)),
789        Some(Kind::NumberValue(n)) => {
790            if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
791                Ok(ElementValue::Integer(*n as i64))
792            } else {
793                Ok(ElementValue::Float(OrderedFloat(*n)))
794            }
795        }
796        Some(Kind::StringValue(s)) => Ok(ElementValue::String(Arc::from(s.as_str()))),
797        Some(Kind::ListValue(_)) | Some(Kind::StructValue(_)) => {
798            // For complex types, convert to JSON string
799            let json_val = proto_value_to_json(value);
800            Ok(ElementValue::String(Arc::from(serde_json::to_string(
801                &json_val,
802            )?)))
803        }
804        None => Ok(ElementValue::Null),
805    }
806}
807
808/// Convert protobuf Value to JSON for complex types
809fn proto_value_to_json(value: &prost_types::Value) -> serde_json::Value {
810    use prost_types::value::Kind;
811
812    match &value.kind {
813        Some(Kind::NullValue(_)) => serde_json::Value::Null,
814        Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
815        Some(Kind::NumberValue(n)) => serde_json::json!(*n),
816        Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
817        Some(Kind::ListValue(list)) => {
818            let arr: Vec<serde_json::Value> = list.values.iter().map(proto_value_to_json).collect();
819            serde_json::Value::Array(arr)
820        }
821        Some(Kind::StructValue(s)) => {
822            let mut map = serde_json::Map::new();
823            for (key, val) in &s.fields {
824                map.insert(key.clone(), proto_value_to_json(val));
825            }
826            serde_json::Value::Object(map)
827        }
828        None => serde_json::Value::Null,
829    }
830}
831
832/// Builder for gRPC sources.
833///
834/// Provides a fluent API for constructing gRPC sources
835/// with sensible defaults.
836///
837/// # Example
838///
839/// ```rust,ignore
840/// use drasi_source_grpc::GrpcSource;
841///
842/// let source = GrpcSource::builder("my-grpc-source")
843///     .with_host("0.0.0.0")
844///     .with_port(50051)
845///     .build()?;
846/// ```
847pub struct GrpcSourceBuilder {
848    id: String,
849    host: String,
850    port: u16,
851    endpoint: Option<String>,
852    timeout_ms: u64,
853    dispatch_mode: Option<DispatchMode>,
854    dispatch_buffer_capacity: Option<usize>,
855    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
856    auto_start: bool,
857}
858
859impl GrpcSourceBuilder {
860    /// Create a new gRPC source builder with the given ID and default values
861    pub fn new(id: impl Into<String>) -> Self {
862        Self {
863            id: id.into(),
864            host: "0.0.0.0".to_string(),
865            port: 50051,
866            endpoint: None,
867            timeout_ms: 5000,
868            dispatch_mode: None,
869            dispatch_buffer_capacity: None,
870            bootstrap_provider: None,
871            auto_start: true,
872        }
873    }
874
875    /// Set the gRPC host
876    pub fn with_host(mut self, host: impl Into<String>) -> Self {
877        self.host = host.into();
878        self
879    }
880
881    /// Set the gRPC port
882    pub fn with_port(mut self, port: u16) -> Self {
883        self.port = port;
884        self
885    }
886
887    /// Set the optional service endpoint
888    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
889        self.endpoint = Some(endpoint.into());
890        self
891    }
892
893    /// Set the request timeout in milliseconds
894    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
895        self.timeout_ms = timeout_ms;
896        self
897    }
898
899    /// Set the dispatch mode for this source
900    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
901        self.dispatch_mode = Some(mode);
902        self
903    }
904
905    /// Set the dispatch buffer capacity for this source
906    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
907        self.dispatch_buffer_capacity = Some(capacity);
908        self
909    }
910
911    /// Set the bootstrap provider for this source
912    pub fn with_bootstrap_provider(
913        mut self,
914        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
915    ) -> Self {
916        self.bootstrap_provider = Some(Box::new(provider));
917        self
918    }
919
920    /// Set whether this source should auto-start when DrasiLib starts.
921    ///
922    /// Default is `true`. Set to `false` if this source should only be
923    /// started manually via `start_source()`.
924    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
925        self.auto_start = auto_start;
926        self
927    }
928
929    /// Set the full configuration at once
930    pub fn with_config(mut self, config: GrpcSourceConfig) -> Self {
931        self.host = config.host;
932        self.port = config.port;
933        self.endpoint = config.endpoint;
934        self.timeout_ms = config.timeout_ms;
935        self
936    }
937
938    /// Build the gRPC source
939    ///
940    /// # Errors
941    ///
942    /// Returns an error if the source cannot be constructed.
943    pub fn build(self) -> Result<GrpcSource> {
944        let config = GrpcSourceConfig {
945            host: self.host,
946            port: self.port,
947            endpoint: self.endpoint,
948            timeout_ms: self.timeout_ms,
949        };
950
951        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
952        if let Some(mode) = self.dispatch_mode {
953            params = params.with_dispatch_mode(mode);
954        }
955        if let Some(capacity) = self.dispatch_buffer_capacity {
956            params = params.with_dispatch_buffer_capacity(capacity);
957        }
958        if let Some(provider) = self.bootstrap_provider {
959            params = params.with_bootstrap_provider(provider);
960        }
961
962        Ok(GrpcSource {
963            base: SourceBase::new(params)?,
964            config,
965        })
966    }
967}
968
969/// Dynamic plugin entry point.
970///
971/// Dynamic plugin entry point.
972#[cfg(feature = "dynamic-plugin")]
973drasi_plugin_sdk::export_plugin!(
974    plugin_id = "grpc-source",
975    core_version = env!("CARGO_PKG_VERSION"),
976    lib_version = env!("CARGO_PKG_VERSION"),
977    plugin_version = env!("CARGO_PKG_VERSION"),
978    source_descriptors = [descriptor::GrpcSourceDescriptor],
979    reaction_descriptors = [],
980    bootstrap_descriptors = [],
981);