use super::subscription::SubscriptionManager;
use super::types::{NodeSubscription, OpcUaConfig, OpcUaDataChange, OpcUaStats};
use crate::error::StreamResult;
use crate::event::{EventMetadata, StreamEvent};
use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::info;
pub struct OpcUaClient {
config: OpcUaConfig,
subscription_manager: Arc<RwLock<SubscriptionManager>>,
stats: Arc<RwLock<OpcUaStats>>,
event_sender: broadcast::Sender<StreamEvent>,
connected: Arc<RwLock<bool>>,
subscriptions: Arc<RwLock<Vec<NodeSubscription>>>,
}
impl OpcUaClient {
pub fn new(config: OpcUaConfig) -> Self {
let (tx, _) = broadcast::channel(10000);
Self {
config,
subscription_manager: Arc::new(RwLock::new(SubscriptionManager::new())),
stats: Arc::new(RwLock::new(OpcUaStats::default())),
event_sender: tx,
connected: Arc::new(RwLock::new(false)),
subscriptions: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn connect(&mut self) -> StreamResult<()> {
*self.connected.write().await = true;
{
let mut stats = self.stats.write().await;
stats.session_count += 1;
stats.last_connected_at = Some(Utc::now());
}
info!("OPC UA client connected to {}", self.config.endpoint_url);
Ok(())
}
pub async fn disconnect(&mut self) -> StreamResult<()> {
*self.connected.write().await = false;
{
let mut stats = self.stats.write().await;
stats.last_disconnected_at = Some(Utc::now());
}
info!("OPC UA client disconnected");
Ok(())
}
pub async fn subscribe_nodes(&self, subscriptions: Vec<NodeSubscription>) -> StreamResult<()> {
{
let mut stats = self.stats.write().await;
stats.subscription_count += 1;
stats.monitored_items_count += subscriptions.len() as u64;
}
*self.subscriptions.write().await = subscriptions;
Ok(())
}
fn data_change_to_event(
&self,
change: &OpcUaDataChange,
mapping: &NodeSubscription,
) -> StreamEvent {
let metadata = EventMetadata {
event_id: uuid::Uuid::new_v4().to_string(),
timestamp: change.source_timestamp.unwrap_or(change.server_timestamp),
source: format!("opcua:{}", self.config.endpoint_url),
user: None,
context: mapping.rdf_graph.clone(),
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
};
let object = change.value.to_rdf_literal();
StreamEvent::TripleAdded {
subject: mapping.rdf_subject.clone(),
predicate: mapping.rdf_predicate.clone(),
object,
graph: mapping.rdf_graph.clone(),
metadata,
}
}
pub async fn get_stats(&self) -> OpcUaStats {
self.stats.read().await.clone()
}
pub async fn is_connected(&self) -> bool {
*self.connected.read().await
}
pub fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent> {
self.event_sender.subscribe()
}
}
pub struct OpcUaBackend {
client: Arc<OpcUaClient>,
}
impl OpcUaBackend {
pub fn new(config: OpcUaConfig) -> Self {
Self {
client: Arc::new(OpcUaClient::new(config)),
}
}
pub fn client(&self) -> Arc<OpcUaClient> {
Arc::clone(&self.client)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_opcua_client_creation() {
let config = OpcUaConfig::default();
let client = OpcUaClient::new(config);
let rt = tokio::runtime::Runtime::new().expect("runtime creation failed");
assert!(!rt.block_on(client.is_connected()));
}
}