#![allow(unexpected_cfgs)]
pub mod config;
pub mod descriptor;
pub use config::PlatformSourceConfig;
use anyhow::Result;
use log::{debug, error, info, warn};
use redis::streams::StreamReadReply;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
use drasi_lib::channels::{
ComponentStatus, ControlOperation, DispatchMode, SourceControl, SourceEvent,
SourceEventWrapper, SubscriptionResponse,
};
use drasi_lib::component_graph::ComponentStatusHandle;
use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
use drasi_lib::sources::manager::convert_json_to_element_properties;
use drasi_lib::Source;
use tracing::Instrument;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone)]
struct PlatformConfig {
redis_url: String,
stream_key: String,
consumer_group: String,
consumer_name: String,
batch_size: usize,
block_ms: u64,
start_id: String,
always_create_consumer_group: bool,
max_retries: usize,
retry_delay_ms: u64,
}
impl Default for PlatformConfig {
fn default() -> Self {
Self {
redis_url: String::new(),
stream_key: String::new(),
consumer_group: String::new(),
consumer_name: String::new(),
batch_size: 10,
block_ms: 5000,
start_id: ">".to_string(),
always_create_consumer_group: false,
max_retries: 3,
retry_delay_ms: 1000,
}
}
}
pub struct PlatformSource {
base: SourceBase,
config: PlatformSourceConfig,
}
pub struct PlatformSourceBuilder {
id: String,
redis_url: String,
stream_key: String,
consumer_group: Option<String>,
consumer_name: Option<String>,
batch_size: Option<usize>,
block_ms: Option<u64>,
dispatch_mode: Option<DispatchMode>,
dispatch_buffer_capacity: Option<usize>,
bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
auto_start: bool,
}
impl PlatformSourceBuilder {
pub fn new(id: impl Into<String>) -> Self {
Self {
id: id.into(),
redis_url: String::new(),
stream_key: String::new(),
consumer_group: None,
consumer_name: None,
batch_size: None,
block_ms: None,
dispatch_mode: None,
dispatch_buffer_capacity: None,
bootstrap_provider: None,
auto_start: true,
}
}
pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
self.redis_url = url.into();
self
}
pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
self.stream_key = key.into();
self
}
pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
self.consumer_group = Some(group.into());
self
}
pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
self.consumer_name = Some(name.into());
self
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = Some(size);
self
}
pub fn with_block_ms(mut self, ms: u64) -> Self {
self.block_ms = Some(ms);
self
}
pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
self.dispatch_mode = Some(mode);
self
}
pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
self.dispatch_buffer_capacity = Some(capacity);
self
}
pub fn with_bootstrap_provider(
mut self,
provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
) -> Self {
self.bootstrap_provider = Some(Box::new(provider));
self
}
pub fn with_auto_start(mut self, auto_start: bool) -> Self {
self.auto_start = auto_start;
self
}
pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
self.redis_url = config.redis_url;
self.stream_key = config.stream_key;
self.consumer_group = Some(config.consumer_group);
self.consumer_name = config.consumer_name;
self.batch_size = Some(config.batch_size);
self.block_ms = Some(config.block_ms);
self
}
pub fn build(self) -> Result<PlatformSource> {
let config = PlatformSourceConfig {
redis_url: self.redis_url,
stream_key: self.stream_key,
consumer_group: self
.consumer_group
.unwrap_or_else(|| "drasi-core".to_string()),
consumer_name: self.consumer_name,
batch_size: self.batch_size.unwrap_or(100),
block_ms: self.block_ms.unwrap_or(5000),
};
let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
if let Some(mode) = self.dispatch_mode {
params = params.with_dispatch_mode(mode);
}
if let Some(capacity) = self.dispatch_buffer_capacity {
params = params.with_dispatch_buffer_capacity(capacity);
}
if let Some(provider) = self.bootstrap_provider {
params = params.with_bootstrap_provider(provider);
}
Ok(PlatformSource {
base: SourceBase::new(params)?,
config,
})
}
}
impl PlatformSource {
pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
PlatformSourceBuilder::new(id)
}
pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
let id = id.into();
let params = SourceBaseParams::new(id);
Ok(Self {
base: SourceBase::new(params)?,
config,
})
}
#[allow(dead_code)]
fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
let redis_url = properties
.get("redis_url")
.and_then(|v| v.as_str())
.ok_or_else(|| {
anyhow::anyhow!(
"Configuration error: Missing required field 'redis_url'. \
Platform source requires a Redis connection URL"
)
})?
.to_string();
let stream_key = properties
.get("stream_key")
.and_then(|v| v.as_str())
.ok_or_else(|| {
anyhow::anyhow!(
"Configuration error: Missing required field 'stream_key'. \
Platform source requires a Redis Stream key to read from"
)
})?
.to_string();
let consumer_group = properties
.get("consumer_group")
.and_then(|v| v.as_str())
.ok_or_else(|| {
anyhow::anyhow!(
"Configuration error: Missing required field 'consumer_group'. \
Platform source requires a consumer group name"
)
})?
.to_string();
let consumer_name = properties
.get("consumer_name")
.and_then(|v| v.as_str())
.ok_or_else(|| {
anyhow::anyhow!(
"Configuration error: Missing required field 'consumer_name'. \
Platform source requires a unique consumer name"
)
})?
.to_string();
let defaults = PlatformConfig::default();
let config = PlatformConfig {
redis_url,
stream_key,
consumer_group,
consumer_name,
batch_size: properties
.get("batch_size")
.and_then(|v| v.as_u64())
.map(|v| v as usize)
.unwrap_or(defaults.batch_size),
block_ms: properties
.get("block_ms")
.and_then(|v| v.as_u64())
.unwrap_or(defaults.block_ms),
start_id: properties
.get("start_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or(defaults.start_id),
always_create_consumer_group: properties
.get("always_create_consumer_group")
.and_then(|v| v.as_bool())
.unwrap_or(defaults.always_create_consumer_group),
max_retries: properties
.get("max_retries")
.and_then(|v| v.as_u64())
.map(|v| v as usize)
.unwrap_or(defaults.max_retries),
retry_delay_ms: properties
.get("retry_delay_ms")
.and_then(|v| v.as_u64())
.unwrap_or(defaults.retry_delay_ms),
};
if config.redis_url.is_empty() {
return Err(anyhow::anyhow!(
"Validation error: redis_url cannot be empty. \
Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
));
}
if config.stream_key.is_empty() {
return Err(anyhow::anyhow!(
"Validation error: stream_key cannot be empty. \
Please specify the Redis Stream key to read from"
));
}
if config.consumer_group.is_empty() {
return Err(anyhow::anyhow!(
"Validation error: consumer_group cannot be empty. \
Please specify a consumer group name for this source"
));
}
if config.consumer_name.is_empty() {
return Err(anyhow::anyhow!(
"Validation error: consumer_name cannot be empty. \
Please specify a unique consumer name within the consumer group"
));
}
Ok(config)
}
async fn connect_with_retry(
redis_url: &str,
max_retries: usize,
retry_delay_ms: u64,
) -> Result<redis::aio::MultiplexedConnection> {
let client = redis::Client::open(redis_url)?;
let mut delay = retry_delay_ms;
for attempt in 0..max_retries {
match client.get_multiplexed_async_connection().await {
Ok(conn) => {
info!("Successfully connected to Redis");
return Ok(conn);
}
Err(e) if attempt < max_retries - 1 => {
warn!(
"Redis connection failed (attempt {}/{}): {}",
attempt + 1,
max_retries,
e
);
tokio::time::sleep(Duration::from_millis(delay)).await;
delay *= 2; }
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to connect to Redis after {max_retries} attempts: {e}"
));
}
}
}
unreachable!()
}
async fn create_consumer_group(
conn: &mut redis::aio::MultiplexedConnection,
stream_key: &str,
consumer_group: &str,
start_id: &str,
always_create: bool,
) -> Result<()> {
let group_start_id = if start_id == ">" {
"$" } else {
start_id };
if always_create {
info!(
"always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
);
let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
.arg("DESTROY")
.arg(stream_key)
.arg(consumer_group)
.query_async(conn)
.await;
match destroy_result {
Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
}
}
let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
.arg("CREATE")
.arg(stream_key)
.arg(consumer_group)
.arg(group_start_id)
.arg("MKSTREAM")
.query_async(conn)
.await;
match result {
Ok(_) => {
info!(
"Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
);
Ok(())
}
Err(e) => {
if e.to_string().contains("BUSYGROUP") {
info!(
"Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
);
Ok(())
} else {
Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
}
}
}
}
async fn start_consumer_task(
source_id: String,
instance_id: String,
platform_config: PlatformConfig,
dispatchers: Arc<
RwLock<
Vec<
Box<
dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
>,
>,
>,
>,
reporter: ComponentStatusHandle,
) -> JoinHandle<()> {
let source_id_for_span = source_id.clone();
let span = tracing::info_span!(
"platform_source_consumer",
instance_id = %instance_id,
component_id = %source_id_for_span,
component_type = "source"
);
tokio::spawn(async move {
info!(
"Starting platform source consumer for source '{}' on stream '{}'",
source_id, platform_config.stream_key
);
let mut conn = match Self::connect_with_retry(
&platform_config.redis_url,
platform_config.max_retries,
platform_config.retry_delay_ms,
)
.await
{
Ok(conn) => conn,
Err(e) => {
error!("Failed to connect to Redis: {e}");
reporter.set_status(
ComponentStatus::Stopped,
Some(format!("Failed to connect to Redis: {e}")),
).await;
return;
}
};
if let Err(e) = Self::create_consumer_group(
&mut conn,
&platform_config.stream_key,
&platform_config.consumer_group,
&platform_config.start_id,
platform_config.always_create_consumer_group,
)
.await
{
error!("Failed to create consumer group: {e}");
reporter.set_status(
ComponentStatus::Stopped,
Some(format!("Failed to create consumer group: {e}")),
).await;
return;
}
loop {
let read_result: Result<StreamReadReply, redis::RedisError> =
redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(&platform_config.consumer_group)
.arg(&platform_config.consumer_name)
.arg("COUNT")
.arg(platform_config.batch_size)
.arg("BLOCK")
.arg(platform_config.block_ms)
.arg("STREAMS")
.arg(&platform_config.stream_key)
.arg(">") .query_async(&mut conn)
.await;
match read_result {
Ok(reply) => {
let mut all_stream_ids = Vec::new();
for stream_key in reply.keys {
for stream_id in stream_key.ids {
debug!("Received event from stream: {}", stream_id.id);
all_stream_ids.push(stream_id.id.clone());
match extract_event_data(&stream_id.map) {
Ok(event_json) => {
match serde_json::from_str::<Value>(&event_json) {
Ok(cloud_event) => {
let message_type =
detect_message_type(&cloud_event);
match message_type {
MessageType::Control(control_type) => {
debug!(
"Detected control message of type: {control_type}"
);
match transform_control_event(
cloud_event,
&control_type,
) {
Ok(control_events) => {
for control_event in control_events
{
let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
let wrapper = SourceEventWrapper::with_profiling(
source_id.clone(),
SourceEvent::Control(control_event),
chrono::Utc::now(),
profiling,
);
if let Err(e) = SourceBase::dispatch_from_task(
dispatchers.clone(),
wrapper,
&source_id,
)
.await
{
debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
} else {
debug!(
"Published control event for stream {}",
stream_id.id
);
}
}
}
Err(e) => {
warn!(
"Failed to transform control event {}: {}",
stream_id.id, e
);
}
}
}
MessageType::Data => {
match transform_platform_event(
cloud_event,
&source_id,
) {
Ok(source_changes_with_timestamps) => {
for item in
source_changes_with_timestamps
{
let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
profiling.source_ns = Some(
item.source_change
.get_transaction_time(),
);
profiling
.reactivator_start_ns =
item.reactivator_start_ns;
profiling.reactivator_end_ns =
item.reactivator_end_ns;
let wrapper = SourceEventWrapper::with_profiling(
source_id.clone(),
SourceEvent::Change(item.source_change),
chrono::Utc::now(),
profiling,
);
if let Err(e) = SourceBase::dispatch_from_task(
dispatchers.clone(),
wrapper,
&source_id,
)
.await
{
debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
} else {
debug!(
"Published source change for event {}",
stream_id.id
);
}
}
}
Err(e) => {
warn!(
"Failed to transform event {}: {}",
stream_id.id, e
);
reporter.set_status(
ComponentStatus::Running,
Some(format!(
"Transformation error: {e}"
)),
).await;
}
}
}
}
}
Err(e) => {
warn!(
"Failed to parse JSON for event {}: {}",
stream_id.id, e
);
}
}
}
Err(e) => {
warn!(
"Failed to extract event data from {}: {}",
stream_id.id, e
);
}
}
}
}
if !all_stream_ids.is_empty() {
debug!("Acknowledging batch of {} messages", all_stream_ids.len());
let mut cmd = redis::cmd("XACK");
cmd.arg(&platform_config.stream_key)
.arg(&platform_config.consumer_group);
for stream_id in &all_stream_ids {
cmd.arg(stream_id);
}
match cmd.query_async::<_, i64>(&mut conn).await {
Ok(ack_count) => {
debug!("Successfully acknowledged {ack_count} messages");
if ack_count as usize != all_stream_ids.len() {
warn!(
"Acknowledged {} messages but expected {}",
ack_count,
all_stream_ids.len()
);
}
}
Err(e) => {
error!("Failed to acknowledge message batch: {e}");
warn!("Falling back to individual acknowledgments");
for stream_id in &all_stream_ids {
match redis::cmd("XACK")
.arg(&platform_config.stream_key)
.arg(&platform_config.consumer_group)
.arg(stream_id)
.query_async::<_, i64>(&mut conn)
.await
{
Ok(_) => {
debug!(
"Individually acknowledged message {stream_id}"
);
}
Err(e) => {
error!(
"Failed to individually acknowledge message {stream_id}: {e}"
);
}
}
}
}
}
}
}
Err(e) => {
if is_connection_error(&e) {
error!("Redis connection lost: {e}");
reporter.set_status(
ComponentStatus::Running,
Some(format!("Redis connection lost: {e}")),
).await;
match Self::connect_with_retry(
&platform_config.redis_url,
platform_config.max_retries,
platform_config.retry_delay_ms,
)
.await
{
Ok(new_conn) => {
conn = new_conn;
info!("Reconnected to Redis");
}
Err(e) => {
error!("Failed to reconnect to Redis: {e}");
reporter.set_status(ComponentStatus::Stopped, None).await;
return;
}
}
} else if !e.to_string().contains("timeout") {
error!("Error reading from stream: {e}");
}
}
}
}
}.instrument(span))
}
}
#[async_trait::async_trait]
impl Source for PlatformSource {
fn id(&self) -> &str {
&self.base.id
}
fn type_name(&self) -> &str {
"platform"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
use crate::descriptor::PlatformSourceConfigDto;
use drasi_plugin_sdk::ConfigValue;
let dto = PlatformSourceConfigDto {
redis_url: ConfigValue::Static(self.config.redis_url.clone()),
stream_key: ConfigValue::Static(self.config.stream_key.clone()),
consumer_group: ConfigValue::Static(self.config.consumer_group.clone()),
consumer_name: self
.config
.consumer_name
.as_ref()
.map(|n| ConfigValue::Static(n.clone())),
batch_size: ConfigValue::Static(self.config.batch_size),
block_ms: ConfigValue::Static(self.config.block_ms),
};
match serde_json::to_value(&dto) {
Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
_ => HashMap::new(),
}
}
fn auto_start(&self) -> bool {
self.base.get_auto_start()
}
async fn start(&self) -> Result<()> {
info!("Starting platform source: {}", self.base.id);
let platform_config = PlatformConfig {
redis_url: self.config.redis_url.clone(),
stream_key: self.config.stream_key.clone(),
consumer_group: self.config.consumer_group.clone(),
consumer_name: self
.config
.consumer_name
.clone()
.unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
batch_size: self.config.batch_size,
block_ms: self.config.block_ms,
start_id: ">".to_string(),
always_create_consumer_group: false,
max_retries: 5,
retry_delay_ms: 1000,
};
self.base
.set_status(
ComponentStatus::Running,
Some("Platform source running".to_string()),
)
.await;
let instance_id = self
.base
.context()
.await
.map(|c| c.instance_id)
.unwrap_or_default();
let task = Self::start_consumer_task(
self.base.id.clone(),
instance_id,
platform_config,
self.base.dispatchers.clone(),
self.base.status_handle(),
)
.await;
*self.base.task_handle.write().await = Some(task);
Ok(())
}
async fn stop(&self) -> Result<()> {
info!("Stopping platform source: {}", self.base.id);
if let Some(handle) = self.base.task_handle.write().await.take() {
handle.abort();
}
self.base
.set_status(
ComponentStatus::Stopped,
Some("Platform source stopped".to_string()),
)
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.get_status().await
}
async fn subscribe(
&self,
settings: drasi_lib::config::SourceSubscriptionSettings,
) -> Result<SubscriptionResponse> {
self.base
.subscribe_with_bootstrap(&settings, "Platform")
.await
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
self.base.initialize(context).await;
}
async fn set_bootstrap_provider(
&self,
provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
) {
self.base.set_bootstrap_provider(provider).await;
}
}
impl PlatformSource {
pub fn test_subscribe(
&self,
) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
self.base.test_subscribe()
}
pub async fn test_subscribe_async(
&self,
) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
self.base
.create_streaming_receiver()
.await
.expect("Failed to create test subscription")
}
}
fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
for key in &["data", "event", "payload", "message"] {
if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
return String::from_utf8(data.clone())
.map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
}
}
Err(anyhow::anyhow!(
"No event data found in stream entry. Available keys: {:?}",
entry_map.keys().collect::<Vec<_>>()
))
}
fn is_connection_error(e: &redis::RedisError) -> bool {
e.is_connection_dropped()
|| e.is_io_error()
|| e.to_string().contains("connection")
|| e.to_string().contains("EOF")
}
#[derive(Debug, Clone, PartialEq)]
enum MessageType {
Control(String),
Data,
}
fn detect_message_type(cloud_event: &Value) -> MessageType {
let data_array = match cloud_event["data"].as_array() {
Some(arr) if !arr.is_empty() => arr,
_ => return MessageType::Data, };
let first_event = &data_array[0];
let source_db = first_event["payload"]["source"]["db"]
.as_str()
.unwrap_or("");
if source_db.eq_ignore_ascii_case("drasi") {
let control_type = first_event["payload"]["source"]["table"]
.as_str()
.unwrap_or("Unknown")
.to_string();
MessageType::Control(control_type)
} else {
MessageType::Data
}
}
#[derive(Debug)]
struct SourceChangeWithTimestamps {
source_change: SourceChange,
reactivator_start_ns: Option<u64>,
reactivator_end_ns: Option<u64>,
}
fn transform_platform_event(
cloud_event: Value,
source_id: &str,
) -> Result<Vec<SourceChangeWithTimestamps>> {
let mut source_changes = Vec::new();
let data_array = cloud_event["data"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
for event in data_array {
let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
let op = event["op"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
let payload = &event["payload"];
if payload.is_null() {
return Err(anyhow::anyhow!("Missing 'payload' field"));
}
let element_type = payload["source"]["table"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
let element_data = match op {
"i" | "u" => &payload["after"],
"d" => &payload["before"],
_ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
};
if element_data.is_null() {
return Err(anyhow::anyhow!(
"Missing element data (after/before) for operation {op}"
));
}
let element_id = element_data["id"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
let labels_array = element_data["labels"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
let labels: Vec<Arc<str>> = labels_array
.iter()
.filter_map(|v| v.as_str().map(Arc::from))
.collect();
if labels.is_empty() {
return Err(anyhow::anyhow!("Labels array is empty or invalid"));
}
let ts_ns = payload["source"]["ts_ns"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
let effective_from = ts_ns / 1_000_000;
let reference = ElementReference::new(source_id, element_id);
let metadata = ElementMetadata {
reference,
labels: labels.into(),
effective_from,
};
if op == "d" {
source_changes.push(SourceChangeWithTimestamps {
source_change: SourceChange::Delete { metadata },
reactivator_start_ns,
reactivator_end_ns,
});
continue;
}
let properties_obj = element_data["properties"]
.as_object()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
let properties = convert_json_to_element_properties(properties_obj);
let element = match element_type {
"node" => Element::Node {
metadata,
properties,
},
"rel" | "relation" => {
let start_id = element_data["startId"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
let end_id = element_data["endId"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
Element::Relation {
metadata,
properties,
in_node: ElementReference::new(source_id, start_id),
out_node: ElementReference::new(source_id, end_id),
}
}
_ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
};
let source_change = match op {
"i" => SourceChange::Insert { element },
"u" => SourceChange::Update { element },
_ => unreachable!(),
};
source_changes.push(SourceChangeWithTimestamps {
source_change,
reactivator_start_ns,
reactivator_end_ns,
});
}
Ok(source_changes)
}
fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
let mut control_events = Vec::new();
let data_array = cloud_event["data"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
if control_type != "SourceSubscription" {
info!(
"Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
);
return Ok(control_events); }
for event in data_array {
let op = event["op"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
let payload = &event["payload"];
if payload.is_null() {
warn!("Missing 'payload' field in control event, skipping");
continue;
}
let control_data = match op {
"i" | "u" => &payload["after"],
"d" => &payload["before"],
_ => {
warn!("Unknown operation type in control event: {op}, skipping");
continue;
}
};
if control_data.is_null() {
warn!("Missing control data (after/before) for operation {op}, skipping");
continue;
}
let query_id = match control_data["queryId"].as_str() {
Some(id) => id.to_string(),
None => {
warn!("Missing required 'queryId' field in control event, skipping");
continue;
}
};
let query_node_id = match control_data["queryNodeId"].as_str() {
Some(id) => id.to_string(),
None => {
warn!("Missing required 'queryNodeId' field in control event, skipping");
continue;
}
};
let node_labels = control_data["nodeLabels"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let rel_labels = control_data["relLabels"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let operation = match op {
"i" => ControlOperation::Insert,
"u" => ControlOperation::Update,
"d" => ControlOperation::Delete,
_ => unreachable!(), };
let control_event = SourceControl::Subscription {
query_id,
query_node_id,
node_labels,
rel_labels,
operation,
};
control_events.push(control_event);
}
Ok(control_events)
}
#[cfg(feature = "dynamic-plugin")]
drasi_plugin_sdk::export_plugin!(
plugin_id = "platform-source",
core_version = env!("CARGO_PKG_VERSION"),
lib_version = env!("CARGO_PKG_VERSION"),
plugin_version = env!("CARGO_PKG_VERSION"),
source_descriptors = [descriptor::PlatformSourceDescriptor],
reaction_descriptors = [],
bootstrap_descriptors = [],
);