1#![allow(unexpected_cfgs)]
2pub mod config;
104pub mod descriptor;
105
106#[cfg(test)]
107mod tests;
108pub use config::GrpcSourceConfig;
109
110use anyhow::Result;
111use async_trait::async_trait;
112use log::{debug, error, info};
113use std::collections::HashMap;
114use std::sync::Arc;
115use tokio::sync::RwLock;
116use tonic::{transport::Server, Request, Response, Status};
117
118use drasi_lib::channels::{DispatchMode, *};
119use drasi_lib::managers::{log_component_start, log_component_stop};
120use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
121use drasi_lib::Source;
122use tracing::Instrument;
123
124#[allow(clippy::unwrap_used)]
127pub mod proto {
128 tonic::include_proto!("drasi.v1");
129}
130
131use proto::{
132 source_service_server::{SourceService, SourceServiceServer},
133 BootstrapRequest as ProtoBootstrapRequest, BootstrapResponse, HealthCheckResponse,
134 SourceChange as ProtoSourceChange, StreamEventResponse, SubmitEventRequest,
135 SubmitEventResponse,
136};
137
138pub struct GrpcSource {
148 base: SourceBase,
150 config: GrpcSourceConfig,
152}
153
154impl GrpcSource {
155 pub fn builder(id: impl Into<String>) -> GrpcSourceBuilder {
169 GrpcSourceBuilder::new(id)
170 }
171
172 pub fn new(id: impl Into<String>, config: GrpcSourceConfig) -> Result<Self> {
205 let id = id.into();
206 let params = SourceBaseParams::new(id);
207 Ok(Self {
208 base: SourceBase::new(params)?,
209 config,
210 })
211 }
212
213 pub fn with_dispatch(
218 id: impl Into<String>,
219 config: GrpcSourceConfig,
220 dispatch_mode: Option<DispatchMode>,
221 dispatch_buffer_capacity: Option<usize>,
222 ) -> Result<Self> {
223 let id = id.into();
224 let mut params = SourceBaseParams::new(id);
225 if let Some(mode) = dispatch_mode {
226 params = params.with_dispatch_mode(mode);
227 }
228 if let Some(capacity) = dispatch_buffer_capacity {
229 params = params.with_dispatch_buffer_capacity(capacity);
230 }
231 Ok(Self {
232 base: SourceBase::new(params)?,
233 config,
234 })
235 }
236}
237
238#[async_trait]
239impl Source for GrpcSource {
240 fn id(&self) -> &str {
241 &self.base.id
242 }
243
244 fn type_name(&self) -> &str {
245 "grpc"
246 }
247
248 fn properties(&self) -> HashMap<String, serde_json::Value> {
249 use crate::descriptor::GrpcSourceConfigDto;
250 use drasi_plugin_sdk::ConfigValue;
251
252 let dto = GrpcSourceConfigDto {
253 host: ConfigValue::Static(self.config.host.clone()),
254 port: ConfigValue::Static(self.config.port),
255 endpoint: self
256 .config
257 .endpoint
258 .as_ref()
259 .map(|e| ConfigValue::Static(e.clone())),
260 timeout_ms: ConfigValue::Static(self.config.timeout_ms),
261 };
262
263 match serde_json::to_value(&dto) {
264 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
265 _ => HashMap::new(),
266 }
267 }
268
269 fn auto_start(&self) -> bool {
270 self.base.get_auto_start()
271 }
272
273 async fn start(&self) -> Result<()> {
274 log_component_start("gRPC Source", &self.base.id);
275
276 self.base
277 .set_status(
278 ComponentStatus::Starting,
279 Some("Starting gRPC source".to_string()),
280 )
281 .await;
282
283 let host = self.config.host.clone();
285 let port = self.config.port;
286
287 let addr = format!("{host}:{port}").parse()?;
288
289 info!("gRPC source '{}' listening on {}", self.base.id, addr);
290
291 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
293 *self.base.shutdown_tx.write().await = Some(shutdown_tx);
294
295 let instance_id = self
297 .base
298 .context()
299 .await
300 .map(|c| c.instance_id)
301 .unwrap_or_default();
302
303 let service = GrpcSourceService {
305 source_id: self.base.id.clone(),
306 instance_id: instance_id.clone(),
307 dispatchers: self.base.dispatchers.clone(),
308 };
309
310 let svc = SourceServiceServer::new(service);
311
312 let source_id = self.base.id.clone();
314 let reporter = self.base.status_handle();
315
316 let source_id_for_span = source_id.clone();
317 let span = tracing::info_span!(
318 "grpc_source_server",
319 instance_id = %instance_id,
320 component_id = %source_id_for_span,
321 component_type = "source"
322 );
323 let task = tokio::spawn(
324 async move {
325 reporter
326 .set_status(
327 ComponentStatus::Running,
328 Some(format!("gRPC source listening on {addr}")),
329 )
330 .await;
331
332 let server =
334 Server::builder()
335 .add_service(svc)
336 .serve_with_shutdown(addr, async move {
337 let _ = shutdown_rx.await;
338 debug!("gRPC source received shutdown signal");
339 });
340
341 if let Err(e) = server.await {
342 error!("gRPC server error: {e}");
343 }
344
345 reporter.set_status(ComponentStatus::Stopped, None).await;
346 }
347 .instrument(span),
348 );
349
350 *self.base.task_handle.write().await = Some(task);
351 Ok(())
355 }
356
357 async fn stop(&self) -> Result<()> {
358 log_component_stop("gRPC Source", &self.base.id);
359 self.base.stop_common().await
360 }
361
362 async fn status(&self) -> ComponentStatus {
363 self.base.get_status().await
364 }
365
366 async fn subscribe(
367 &self,
368 settings: drasi_lib::config::SourceSubscriptionSettings,
369 ) -> Result<SubscriptionResponse> {
370 self.base.subscribe_with_bootstrap(&settings, "gRPC").await
371 }
372
373 fn as_any(&self) -> &dyn std::any::Any {
374 self
375 }
376
377 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
378 self.base.initialize(context).await;
379 }
380
381 async fn set_bootstrap_provider(
382 &self,
383 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
384 ) {
385 self.base.set_bootstrap_provider(provider).await;
386 }
387}
388
389struct GrpcSourceService {
393 source_id: String,
395 instance_id: String,
397 dispatchers: Arc<
399 RwLock<
400 Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
401 >,
402 >,
403}
404
405#[tonic::async_trait]
406impl SourceService for GrpcSourceService {
407 async fn submit_event(
408 &self,
409 request: Request<SubmitEventRequest>,
410 ) -> Result<Response<SubmitEventResponse>, Status> {
411 let event_request = request.into_inner();
412
413 if let Some(proto_change) = event_request.event {
414 match convert_proto_to_source_change(&proto_change, &self.source_id) {
415 Ok(source_change) => {
416 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
418 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
419
420 let wrapper = SourceEventWrapper::with_profiling(
421 self.source_id.clone(),
422 SourceEvent::Change(source_change),
423 chrono::Utc::now(),
424 profiling,
425 );
426
427 debug!("[{}] Processing gRPC event: {:?}", self.source_id, &wrapper);
428
429 if let Err(e) = SourceBase::dispatch_from_task(
431 self.dispatchers.clone(),
432 wrapper,
433 &self.source_id,
434 )
435 .await
436 {
437 debug!(
438 "[{}] Failed to dispatch (no subscribers): {}",
439 self.source_id, e
440 );
441 }
442
443 debug!("[{}] Successfully processed gRPC event", self.source_id);
444 Ok(Response::new(SubmitEventResponse {
445 success: true,
446 message: "Event processed successfully".to_string(),
447 error: String::new(),
448 event_id: uuid::Uuid::new_v4().to_string(),
449 }))
450 }
451 Err(e) => {
452 error!("[{}] Invalid event data: {}", self.source_id, e);
453 Ok(Response::new(SubmitEventResponse {
454 success: false,
455 message: "Invalid event data".to_string(),
456 error: e.to_string(),
457 event_id: String::new(),
458 }))
459 }
460 }
461 } else {
462 Ok(Response::new(SubmitEventResponse {
463 success: false,
464 message: "No event provided".to_string(),
465 error: "Event is required".to_string(),
466 event_id: String::new(),
467 }))
468 }
469 }
470
471 type StreamEventsStream =
472 tokio_stream::wrappers::ReceiverStream<Result<StreamEventResponse, Status>>;
473
474 async fn stream_events(
475 &self,
476 request: Request<tonic::Streaming<ProtoSourceChange>>,
477 ) -> Result<Response<Self::StreamEventsStream>, Status> {
478 let mut stream = request.into_inner();
479 let source_id = self.source_id.clone();
480 let instance_id = self.instance_id.clone();
481 let dispatchers = self.dispatchers.clone();
482
483 let (tx, rx) = tokio::sync::mpsc::channel(128);
484
485 let source_id_for_span = source_id.clone();
486 let span = tracing::info_span!(
487 "grpc_stream_events",
488 instance_id = %instance_id,
489 component_id = %source_id_for_span,
490 component_type = "source"
491 );
492 tokio::spawn(
493 async move {
494 let mut events_processed = 0u64;
495
496 while let Ok(Some(proto_change)) = stream.message().await {
497 match convert_proto_to_source_change(&proto_change, &source_id) {
498 Ok(source_change) => {
499 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
501 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
502
503 let wrapper = SourceEventWrapper::with_profiling(
504 source_id.clone(),
505 SourceEvent::Change(source_change),
506 chrono::Utc::now(),
507 profiling,
508 );
509
510 if let Err(e) = SourceBase::dispatch_from_task(
512 dispatchers.clone(),
513 wrapper.clone(),
514 &source_id,
515 )
516 .await
517 {
518 debug!("[{source_id}] Failed to dispatch (no subscribers): {e}");
519 }
520
521 events_processed += 1;
522
523 if events_processed.is_multiple_of(100) {
525 let _ = tx
526 .send(Ok(StreamEventResponse {
527 success: true,
528 message: format!("Processed {events_processed} events"),
529 error: String::new(),
530 events_processed,
531 }))
532 .await;
533 }
534 }
535 Err(e) => {
536 error!("[{source_id}] Invalid event data: {e}");
537 let _ = tx
538 .send(Ok(StreamEventResponse {
539 success: false,
540 message: "Invalid event data".to_string(),
541 error: e.to_string(),
542 events_processed,
543 }))
544 .await;
545 }
546 }
547 }
548
549 let _ = tx
551 .send(Ok(StreamEventResponse {
552 success: true,
553 message: format!("Stream completed. Processed {events_processed} events"),
554 error: String::new(),
555 events_processed,
556 }))
557 .await;
558 }
559 .instrument(span),
560 );
561
562 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
563 rx,
564 )))
565 }
566
567 type RequestBootstrapStream =
568 tokio_stream::wrappers::ReceiverStream<Result<BootstrapResponse, Status>>;
569
570 async fn request_bootstrap(
571 &self,
572 _request: Request<ProtoBootstrapRequest>,
573 ) -> Result<Response<Self::RequestBootstrapStream>, Status> {
574 let (tx, rx) = tokio::sync::mpsc::channel(1);
577
578 let instance_id = self.instance_id.clone();
579 let source_id_for_span = self.source_id.clone();
580 let span = tracing::info_span!(
581 "grpc_request_bootstrap",
582 instance_id = %instance_id,
583 component_id = %source_id_for_span,
584 component_type = "source"
585 );
586 tokio::spawn(
587 async move {
588 let _ = tx
589 .send(Ok(BootstrapResponse {
590 elements: vec![],
591 total_count: 0,
592 }))
593 .await;
594 }
595 .instrument(span),
596 );
597
598 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
599 rx,
600 )))
601 }
602
603 async fn health_check(
604 &self,
605 _request: Request<()>,
606 ) -> Result<Response<HealthCheckResponse>, Status> {
607 Ok(Response::new(HealthCheckResponse {
608 status: proto::health_check_response::Status::Healthy as i32,
609 message: "gRPC source is healthy".to_string(),
610 version: env!("CARGO_PKG_VERSION").to_string(),
611 }))
612 }
613}
614
615fn convert_proto_to_source_change(
633 proto_change: &ProtoSourceChange,
634 source_id: &str,
635) -> Result<drasi_core::models::SourceChange> {
636 use drasi_core::models::SourceChange;
637 use proto::source_change::Change;
638
639 let change_type = proto::ChangeType::try_from(proto_change.r#type)
640 .map_err(|_| anyhow::anyhow!("Invalid change type"))?;
641
642 match (change_type, &proto_change.change) {
643 (
644 proto::ChangeType::Insert | proto::ChangeType::Update,
645 Some(Change::Element(proto_element)),
646 ) => {
647 let element = convert_proto_element_to_core(proto_element, source_id)?;
648
649 if change_type == proto::ChangeType::Insert {
650 Ok(SourceChange::Insert { element })
651 } else {
652 Ok(SourceChange::Update { element })
653 }
654 }
655 (proto::ChangeType::Delete, Some(Change::Metadata(proto_metadata))) => {
656 let metadata = convert_proto_metadata_to_core(proto_metadata, source_id)?;
657 Ok(SourceChange::Delete { metadata })
658 }
659 _ => Err(anyhow::anyhow!("Invalid change type or missing data")),
660 }
661}
662
663fn convert_proto_element_to_core(
667 proto_element: &proto::Element,
668 source_id: &str,
669) -> Result<drasi_core::models::Element> {
670 use drasi_core::models::{Element, ElementReference};
671 use proto::element::Element as ProtoElementType;
672
673 match &proto_element.element {
674 Some(ProtoElementType::Node(node)) => {
675 let metadata = node
676 .metadata
677 .as_ref()
678 .ok_or_else(|| anyhow::anyhow!(
679 "Validation error: Node element missing required 'metadata' field in gRPC message. \
680 Ensure the gRPC client sends complete node data."
681 ))?;
682
683 let metadata = convert_proto_metadata_to_core(metadata, source_id)?;
684 let properties = convert_proto_properties(&node.properties)?;
685
686 Ok(Element::Node {
687 metadata,
688 properties,
689 })
690 }
691 Some(ProtoElementType::Relation(relation)) => {
692 let metadata = relation
693 .metadata
694 .as_ref()
695 .ok_or_else(|| anyhow::anyhow!(
696 "Validation error: Relation element missing required 'metadata' field in gRPC message. \
697 Ensure the gRPC client sends complete relation data."
698 ))?;
699
700 let metadata = convert_proto_metadata_to_core(metadata, source_id)?;
701 let properties = convert_proto_properties(&relation.properties)?;
702
703 let in_node = relation.in_node.as_ref().ok_or_else(|| {
704 anyhow::anyhow!(
705 "Validation error: Relation missing required 'in_node' field. \
706 Relations must specify both source and target nodes."
707 )
708 })?;
709 let out_node = relation.out_node.as_ref().ok_or_else(|| {
710 anyhow::anyhow!(
711 "Validation error: Relation missing required 'out_node' field. \
712 Relations must specify both source and target nodes."
713 )
714 })?;
715
716 Ok(Element::Relation {
717 metadata,
718 properties,
719 in_node: ElementReference {
720 source_id: Arc::from(in_node.source_id.as_str()),
721 element_id: Arc::from(in_node.element_id.as_str()),
722 },
723 out_node: ElementReference {
724 source_id: Arc::from(out_node.source_id.as_str()),
725 element_id: Arc::from(out_node.element_id.as_str()),
726 },
727 })
728 }
729 None => Err(anyhow::anyhow!("Element type not specified")),
730 }
731}
732
733fn convert_proto_metadata_to_core(
735 proto_metadata: &proto::ElementMetadata,
736 source_id: &str,
737) -> Result<drasi_core::models::ElementMetadata> {
738 use drasi_core::models::{ElementMetadata, ElementReference};
739
740 let reference = proto_metadata
741 .reference
742 .as_ref()
743 .ok_or_else(|| anyhow::anyhow!("Metadata missing reference"))?;
744
745 Ok(ElementMetadata {
746 reference: ElementReference {
747 source_id: Arc::from(source_id),
748 element_id: Arc::from(reference.element_id.as_str()),
749 },
750 labels: Arc::from(
751 proto_metadata
752 .labels
753 .iter()
754 .map(|s| Arc::from(s.as_str()))
755 .collect::<Vec<_>>(),
756 ),
757 effective_from: proto_metadata.effective_from / 1_000_000, })
759}
760
761fn convert_proto_properties(
763 props: &Option<prost_types::Struct>,
764) -> Result<drasi_core::models::ElementPropertyMap> {
765 use drasi_core::models::ElementPropertyMap;
766
767 let mut properties = ElementPropertyMap::new();
768
769 if let Some(struct_props) = props {
770 for (key, value) in &struct_props.fields {
771 properties.insert(key, convert_proto_value_to_element_value(value)?);
772 }
773 }
774
775 Ok(properties)
776}
777
778fn convert_proto_value_to_element_value(
780 value: &prost_types::Value,
781) -> Result<drasi_core::models::ElementValue> {
782 use drasi_core::models::ElementValue;
783 use ordered_float::OrderedFloat;
784 use prost_types::value::Kind;
785
786 match &value.kind {
787 Some(Kind::NullValue(_)) => Ok(ElementValue::Null),
788 Some(Kind::BoolValue(b)) => Ok(ElementValue::Bool(*b)),
789 Some(Kind::NumberValue(n)) => {
790 if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
791 Ok(ElementValue::Integer(*n as i64))
792 } else {
793 Ok(ElementValue::Float(OrderedFloat(*n)))
794 }
795 }
796 Some(Kind::StringValue(s)) => Ok(ElementValue::String(Arc::from(s.as_str()))),
797 Some(Kind::ListValue(_)) | Some(Kind::StructValue(_)) => {
798 let json_val = proto_value_to_json(value);
800 Ok(ElementValue::String(Arc::from(serde_json::to_string(
801 &json_val,
802 )?)))
803 }
804 None => Ok(ElementValue::Null),
805 }
806}
807
808fn proto_value_to_json(value: &prost_types::Value) -> serde_json::Value {
810 use prost_types::value::Kind;
811
812 match &value.kind {
813 Some(Kind::NullValue(_)) => serde_json::Value::Null,
814 Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
815 Some(Kind::NumberValue(n)) => serde_json::json!(*n),
816 Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
817 Some(Kind::ListValue(list)) => {
818 let arr: Vec<serde_json::Value> = list.values.iter().map(proto_value_to_json).collect();
819 serde_json::Value::Array(arr)
820 }
821 Some(Kind::StructValue(s)) => {
822 let mut map = serde_json::Map::new();
823 for (key, val) in &s.fields {
824 map.insert(key.clone(), proto_value_to_json(val));
825 }
826 serde_json::Value::Object(map)
827 }
828 None => serde_json::Value::Null,
829 }
830}
831
832pub struct GrpcSourceBuilder {
848 id: String,
849 host: String,
850 port: u16,
851 endpoint: Option<String>,
852 timeout_ms: u64,
853 dispatch_mode: Option<DispatchMode>,
854 dispatch_buffer_capacity: Option<usize>,
855 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
856 auto_start: bool,
857}
858
859impl GrpcSourceBuilder {
860 pub fn new(id: impl Into<String>) -> Self {
862 Self {
863 id: id.into(),
864 host: "0.0.0.0".to_string(),
865 port: 50051,
866 endpoint: None,
867 timeout_ms: 5000,
868 dispatch_mode: None,
869 dispatch_buffer_capacity: None,
870 bootstrap_provider: None,
871 auto_start: true,
872 }
873 }
874
875 pub fn with_host(mut self, host: impl Into<String>) -> Self {
877 self.host = host.into();
878 self
879 }
880
881 pub fn with_port(mut self, port: u16) -> Self {
883 self.port = port;
884 self
885 }
886
887 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
889 self.endpoint = Some(endpoint.into());
890 self
891 }
892
893 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
895 self.timeout_ms = timeout_ms;
896 self
897 }
898
899 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
901 self.dispatch_mode = Some(mode);
902 self
903 }
904
905 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
907 self.dispatch_buffer_capacity = Some(capacity);
908 self
909 }
910
911 pub fn with_bootstrap_provider(
913 mut self,
914 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
915 ) -> Self {
916 self.bootstrap_provider = Some(Box::new(provider));
917 self
918 }
919
920 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
925 self.auto_start = auto_start;
926 self
927 }
928
929 pub fn with_config(mut self, config: GrpcSourceConfig) -> Self {
931 self.host = config.host;
932 self.port = config.port;
933 self.endpoint = config.endpoint;
934 self.timeout_ms = config.timeout_ms;
935 self
936 }
937
938 pub fn build(self) -> Result<GrpcSource> {
944 let config = GrpcSourceConfig {
945 host: self.host,
946 port: self.port,
947 endpoint: self.endpoint,
948 timeout_ms: self.timeout_ms,
949 };
950
951 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
952 if let Some(mode) = self.dispatch_mode {
953 params = params.with_dispatch_mode(mode);
954 }
955 if let Some(capacity) = self.dispatch_buffer_capacity {
956 params = params.with_dispatch_buffer_capacity(capacity);
957 }
958 if let Some(provider) = self.bootstrap_provider {
959 params = params.with_bootstrap_provider(provider);
960 }
961
962 Ok(GrpcSource {
963 base: SourceBase::new(params)?,
964 config,
965 })
966 }
967}
968
969#[cfg(feature = "dynamic-plugin")]
973drasi_plugin_sdk::export_plugin!(
974 plugin_id = "grpc-source",
975 core_version = env!("CARGO_PKG_VERSION"),
976 lib_version = env!("CARGO_PKG_VERSION"),
977 plugin_version = env!("CARGO_PKG_VERSION"),
978 source_descriptors = [descriptor::GrpcSourceDescriptor],
979 reaction_descriptors = [],
980 bootstrap_descriptors = [],
981);