use super::catalog::DataResource;
use super::identity::DapsClient;
use super::types::{IdsError, IdsResult, IdsUri, Party, SecurityProfile};
use chrono::{DateTime, Utc};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum BrokerMessageType {
ConnectorUpdateMessage,
ConnectorUnavailableMessage,
ResourceUpdateMessage,
ResourceUnavailableMessage,
QueryMessage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BrokerMessage {
#[serde(rename = "@id")]
pub id: String,
#[serde(rename = "@type")]
pub message_type: BrokerMessageType,
pub issuer_connector: IdsUri,
pub issued: DateTime<Utc>,
pub security_token: Option<String>,
}
impl BrokerMessage {
pub fn new(message_type: BrokerMessageType, issuer: IdsUri) -> Self {
Self {
id: format!(
"urn:ids:message:{}",
Utc::now().timestamp_nanos_opt().unwrap_or(0)
),
message_type,
issuer_connector: issuer,
issued: Utc::now(),
security_token: None,
}
}
pub fn with_security_token(mut self, token: impl Into<String>) -> Self {
self.security_token = Some(token.into());
self
}
}
pub struct BrokerClient {
broker_url: String,
http_client: Client,
daps_client: Arc<DapsClient>,
connector_id: IdsUri,
catalog_cache: Arc<RwLock<Option<BrokerCatalog>>>,
cache_expiry: Arc<RwLock<Option<DateTime<Utc>>>>,
}
impl BrokerClient {
pub fn new(
broker_url: impl Into<String>,
connector_id: IdsUri,
daps_client: Arc<DapsClient>,
) -> Self {
Self {
broker_url: broker_url.into(),
http_client: Client::new(),
daps_client,
connector_id,
catalog_cache: Arc::new(RwLock::new(None)),
cache_expiry: Arc::new(RwLock::new(None)),
}
}
pub async fn register_connector(
&self,
self_description: &ConnectorSelfDescription,
) -> IdsResult<RegistrationResult> {
let token = self.daps_client.get_token(&self.connector_id).await?;
let message = BrokerMessage::new(
BrokerMessageType::ConnectorUpdateMessage,
self.connector_id.clone(),
)
.with_security_token(&token.access_token);
let response = self
.http_client
.post(format!("{}/infrastructure", self.broker_url))
.header("Authorization", format!("Bearer {}", token.access_token))
.header("Content-Type", "application/json")
.json(&BrokerRegistrationRequest {
message,
self_description: self_description.clone(),
})
.send()
.await
.map_err(|e| IdsError::MessageProtocolError(format!("Broker request failed: {}", e)))?;
if response.status().is_success() {
let result: RegistrationResult = response.json().await.map_err(|e| {
IdsError::SerializationError(format!("Failed to parse response: {}", e))
})?;
Ok(result)
} else {
Err(IdsError::MessageProtocolError(format!(
"Broker registration failed: {}",
response.status()
)))
}
}
pub async fn unregister_connector(&self) -> IdsResult<()> {
let token = self.daps_client.get_token(&self.connector_id).await?;
let message = BrokerMessage::new(
BrokerMessageType::ConnectorUnavailableMessage,
self.connector_id.clone(),
)
.with_security_token(&token.access_token);
let response = self
.http_client
.post(format!("{}/infrastructure", self.broker_url))
.header("Authorization", format!("Bearer {}", token.access_token))
.header("Content-Type", "application/json")
.json(&message)
.send()
.await
.map_err(|e| IdsError::MessageProtocolError(format!("Broker request failed: {}", e)))?;
if response.status().is_success() {
Ok(())
} else {
Err(IdsError::MessageProtocolError(format!(
"Broker unregistration failed: {}",
response.status()
)))
}
}
pub async fn publish_resource(&self, resource: &DataResource) -> IdsResult<()> {
let token = self.daps_client.get_token(&self.connector_id).await?;
let message = BrokerMessage::new(
BrokerMessageType::ResourceUpdateMessage,
self.connector_id.clone(),
)
.with_security_token(&token.access_token);
let response = self
.http_client
.post(format!("{}/data", self.broker_url))
.header("Authorization", format!("Bearer {}", token.access_token))
.header("Content-Type", "application/json")
.json(&BrokerResourceRequest {
message,
resource: resource.clone(),
})
.send()
.await
.map_err(|e| IdsError::MessageProtocolError(format!("Broker request failed: {}", e)))?;
if response.status().is_success() {
Ok(())
} else {
Err(IdsError::MessageProtocolError(format!(
"Resource publish failed: {}",
response.status()
)))
}
}
pub async fn unpublish_resource(&self, resource_id: &IdsUri) -> IdsResult<()> {
let token = self.daps_client.get_token(&self.connector_id).await?;
let message = BrokerMessage::new(
BrokerMessageType::ResourceUnavailableMessage,
self.connector_id.clone(),
)
.with_security_token(&token.access_token);
let response = self
.http_client
.delete(format!("{}/data/{}", self.broker_url, resource_id))
.header("Authorization", format!("Bearer {}", token.access_token))
.header("Content-Type", "application/json")
.json(&message)
.send()
.await
.map_err(|e| IdsError::MessageProtocolError(format!("Broker request failed: {}", e)))?;
if response.status().is_success() {
Ok(())
} else {
Err(IdsError::MessageProtocolError(format!(
"Resource unpublish failed: {}",
response.status()
)))
}
}
pub async fn query_catalog(&self, query: &CatalogQuery) -> IdsResult<Vec<BrokerResource>> {
let token = self.daps_client.get_token(&self.connector_id).await?;
let message =
BrokerMessage::new(BrokerMessageType::QueryMessage, self.connector_id.clone())
.with_security_token(&token.access_token);
let response = self
.http_client
.post(format!("{}/data/query", self.broker_url))
.header("Authorization", format!("Bearer {}", token.access_token))
.header("Content-Type", "application/json")
.json(&BrokerQueryRequest {
message,
query: query.clone(),
})
.send()
.await
.map_err(|e| IdsError::MessageProtocolError(format!("Broker request failed: {}", e)))?;
if response.status().is_success() {
let result: CatalogQueryResult = response.json().await.map_err(|e| {
IdsError::SerializationError(format!("Failed to parse response: {}", e))
})?;
Ok(result.resources)
} else {
Err(IdsError::MessageProtocolError(format!(
"Catalog query failed: {}",
response.status()
)))
}
}
pub async fn get_catalog(&self, force_refresh: bool) -> IdsResult<BrokerCatalog> {
if !force_refresh {
let cache = self.catalog_cache.read().await;
let expiry = self.cache_expiry.read().await;
if let (Some(catalog), Some(exp)) = (cache.as_ref(), expiry.as_ref()) {
if *exp > Utc::now() {
return Ok(catalog.clone());
}
}
}
let query = CatalogQuery::default();
let resources = self.query_catalog(&query).await?;
let catalog = BrokerCatalog {
broker_url: self.broker_url.clone(),
resources,
fetched_at: Utc::now(),
};
{
let mut cache = self.catalog_cache.write().await;
*cache = Some(catalog.clone());
let mut expiry = self.cache_expiry.write().await;
*expiry = Some(Utc::now() + chrono::Duration::minutes(5));
}
Ok(catalog)
}
pub async fn search(&self, keyword: &str) -> IdsResult<Vec<BrokerResource>> {
let query = CatalogQuery {
keyword: Some(keyword.to_string()),
..Default::default()
};
self.query_catalog(&query).await
}
pub async fn search_by_type(&self, content_type: &str) -> IdsResult<Vec<BrokerResource>> {
let query = CatalogQuery {
content_type: Some(content_type.to_string()),
..Default::default()
};
self.query_catalog(&query).await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConnectorSelfDescription {
#[serde(rename = "@id")]
pub id: IdsUri,
#[serde(rename = "@type")]
pub connector_type: String,
pub title: String,
pub description: String,
pub security_profile: SecurityProfile,
pub curator: Party,
pub maintainer: Party,
pub resource_catalog: Vec<DataResource>,
pub endpoints: Vec<ConnectorEndpoint>,
pub inbound_model_version: Vec<String>,
pub outbound_model_version: String,
pub created_at: DateTime<Utc>,
pub modified_at: DateTime<Utc>,
}
impl ConnectorSelfDescription {
pub fn new(
id: IdsUri,
title: impl Into<String>,
description: impl Into<String>,
curator: Party,
) -> Self {
Self {
id,
connector_type: "ids:BaseConnector".to_string(),
title: title.into(),
description: description.into(),
security_profile: SecurityProfile::TrustSecurityProfile,
curator: curator.clone(),
maintainer: curator,
resource_catalog: Vec::new(),
endpoints: Vec::new(),
inbound_model_version: vec!["4.2.7".to_string()],
outbound_model_version: "4.2.7".to_string(),
created_at: Utc::now(),
modified_at: Utc::now(),
}
}
pub fn with_resource(mut self, resource: DataResource) -> Self {
self.resource_catalog.push(resource);
self
}
pub fn with_endpoint(mut self, endpoint: ConnectorEndpoint) -> Self {
self.endpoints.push(endpoint);
self
}
pub fn with_security_profile(mut self, profile: SecurityProfile) -> Self {
self.security_profile = profile;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConnectorEndpoint {
#[serde(rename = "@id")]
pub id: IdsUri,
#[serde(rename = "@type")]
pub endpoint_type: String,
pub access_url: String,
pub endpoint_documentation: Option<String>,
pub path: Option<String>,
}
impl ConnectorEndpoint {
pub fn new(id: IdsUri, access_url: impl Into<String>) -> Self {
Self {
id,
endpoint_type: "ids:ConnectorEndpoint".to_string(),
access_url: access_url.into(),
endpoint_documentation: None,
path: None,
}
}
pub fn with_documentation(mut self, doc: impl Into<String>) -> Self {
self.endpoint_documentation = Some(doc.into());
self
}
pub fn with_path(mut self, path: impl Into<String>) -> Self {
self.path = Some(path.into());
self
}
}
#[derive(Debug, Serialize, Deserialize)]
struct BrokerRegistrationRequest {
message: BrokerMessage,
self_description: ConnectorSelfDescription,
}
#[derive(Debug, Serialize, Deserialize)]
struct BrokerResourceRequest {
message: BrokerMessage,
resource: DataResource,
}
#[derive(Debug, Serialize, Deserialize)]
struct BrokerQueryRequest {
message: BrokerMessage,
query: CatalogQuery,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistrationResult {
pub success: bool,
pub registration_id: Option<String>,
pub message: Option<String>,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CatalogQuery {
pub keyword: Option<String>,
pub content_type: Option<String>,
pub publisher: Option<IdsUri>,
pub language: Option<String>,
pub limit: Option<u32>,
pub offset: Option<u32>,
pub sparql: Option<String>,
}
impl CatalogQuery {
pub fn keyword(keyword: impl Into<String>) -> Self {
Self {
keyword: Some(keyword.into()),
..Default::default()
}
}
pub fn sparql(query: impl Into<String>) -> Self {
Self {
sparql: Some(query.into()),
..Default::default()
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CatalogQueryResult {
pub resources: Vec<BrokerResource>,
pub total: usize,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BrokerResource {
#[serde(rename = "@id")]
pub id: IdsUri,
pub title: String,
pub description: Option<String>,
pub publisher: IdsUri,
pub content_type: Option<String>,
pub language: Option<String>,
#[serde(default)]
pub keywords: Vec<String>,
pub access_url: Option<String>,
pub created_at: DateTime<Utc>,
pub modified_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BrokerCatalog {
pub broker_url: String,
pub resources: Vec<BrokerResource>,
pub fetched_at: DateTime<Utc>,
}
impl BrokerCatalog {
pub fn by_publisher(&self, publisher: &IdsUri) -> Vec<&BrokerResource> {
self.resources
.iter()
.filter(|r| &r.publisher == publisher)
.collect()
}
pub fn by_keyword(&self, keyword: &str) -> Vec<&BrokerResource> {
let kw_lower = keyword.to_lowercase();
self.resources
.iter()
.filter(|r| {
r.title.to_lowercase().contains(&kw_lower)
|| r.description
.as_ref()
.map(|d| d.to_lowercase().contains(&kw_lower))
.unwrap_or(false)
|| r.keywords
.iter()
.any(|k| k.to_lowercase().contains(&kw_lower))
})
.collect()
}
pub fn len(&self) -> usize {
self.resources.len()
}
pub fn is_empty(&self) -> bool {
self.resources.is_empty()
}
}
pub struct MultiBrokerManager {
brokers: Arc<RwLock<HashMap<String, Arc<BrokerClient>>>>,
primary_broker: Option<String>,
}
impl MultiBrokerManager {
pub fn new() -> Self {
Self {
brokers: Arc::new(RwLock::new(HashMap::new())),
primary_broker: None,
}
}
pub async fn add_broker(&self, name: impl Into<String>, client: BrokerClient) {
let mut brokers = self.brokers.write().await;
brokers.insert(name.into(), Arc::new(client));
}
pub fn set_primary(&mut self, name: impl Into<String>) {
self.primary_broker = Some(name.into());
}
pub async fn get_broker(&self, name: &str) -> Option<Arc<BrokerClient>> {
let brokers = self.brokers.read().await;
brokers.get(name).cloned()
}
pub async fn primary(&self) -> Option<Arc<BrokerClient>> {
if let Some(ref name) = self.primary_broker {
self.get_broker(name).await
} else {
None
}
}
pub async fn query_all(&self, query: &CatalogQuery) -> IdsResult<Vec<BrokerResource>> {
let brokers = self.brokers.read().await;
let mut all_resources = Vec::new();
for (name, client) in brokers.iter() {
match client.query_catalog(query).await {
Ok(resources) => {
tracing::info!("Got {} resources from broker {}", resources.len(), name);
all_resources.extend(resources);
}
Err(e) => {
tracing::warn!("Failed to query broker {}: {}", name, e);
}
}
}
let mut seen = std::collections::HashSet::new();
all_resources.retain(|r| seen.insert(r.id.as_str().to_string()));
Ok(all_resources)
}
pub async fn register_all(
&self,
self_description: &ConnectorSelfDescription,
) -> HashMap<String, IdsResult<RegistrationResult>> {
let brokers = self.brokers.read().await;
let mut results = HashMap::new();
for (name, client) in brokers.iter() {
let result = client.register_connector(self_description).await;
results.insert(name.clone(), result);
}
results
}
pub async fn publish_to_all(&self, resource: &DataResource) -> HashMap<String, IdsResult<()>> {
let brokers = self.brokers.read().await;
let mut results = HashMap::new();
for (name, client) in brokers.iter() {
let result = client.publish_resource(resource).await;
results.insert(name.clone(), result);
}
results
}
}
impl Default for MultiBrokerManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_party() -> Party {
Party {
id: IdsUri::new("https://example.org/party/1").expect("valid uri"),
name: "Test Party".to_string(),
legal_name: None,
description: None,
contact: None,
gaiax_participant_id: None,
}
}
#[test]
fn test_self_description_creation() {
let id = IdsUri::new("urn:ids:connector:test").expect("valid uri");
let sd = ConnectorSelfDescription::new(
id,
"Test Connector",
"A test IDS connector",
test_party(),
)
.with_security_profile(SecurityProfile::TrustPlusSecurityProfile);
assert_eq!(sd.title, "Test Connector");
assert_eq!(
sd.security_profile,
SecurityProfile::TrustPlusSecurityProfile
);
}
#[test]
fn test_endpoint_creation() {
let id = IdsUri::new("urn:ids:endpoint:1").expect("valid uri");
let endpoint = ConnectorEndpoint::new(id, "https://connector.example.org/api")
.with_path("/data")
.with_documentation("https://docs.example.org/api");
assert_eq!(endpoint.access_url, "https://connector.example.org/api");
assert_eq!(endpoint.path, Some("/data".to_string()));
}
#[test]
fn test_catalog_query() {
let query = CatalogQuery::keyword("sensor data").sparql.take();
let query = CatalogQuery {
keyword: Some("sensor data".to_string()),
limit: Some(10),
..Default::default()
};
assert_eq!(query.keyword, Some("sensor data".to_string()));
assert_eq!(query.limit, Some(10));
}
#[test]
fn test_broker_catalog_search() {
let catalog = BrokerCatalog {
broker_url: "https://broker.example.org".to_string(),
resources: vec![
BrokerResource {
id: IdsUri::new("urn:ids:resource:1").expect("valid uri"),
title: "Temperature Sensor Data".to_string(),
description: Some("Real-time temperature readings".to_string()),
publisher: IdsUri::new("urn:ids:connector:pub1").expect("valid uri"),
content_type: Some("application/json".to_string()),
language: Some("en".to_string()),
keywords: vec!["temperature".to_string(), "sensor".to_string()],
access_url: Some("https://example.org/data/temp".to_string()),
created_at: Utc::now(),
modified_at: Utc::now(),
},
BrokerResource {
id: IdsUri::new("urn:ids:resource:2").expect("valid uri"),
title: "Traffic Data".to_string(),
description: Some("City traffic patterns".to_string()),
publisher: IdsUri::new("urn:ids:connector:pub2").expect("valid uri"),
content_type: Some("application/json".to_string()),
language: Some("en".to_string()),
keywords: vec!["traffic".to_string(), "city".to_string()],
access_url: Some("https://example.org/data/traffic".to_string()),
created_at: Utc::now(),
modified_at: Utc::now(),
},
],
fetched_at: Utc::now(),
};
let results = catalog.by_keyword("temperature");
assert_eq!(results.len(), 1);
assert_eq!(results[0].title, "Temperature Sensor Data");
let results = catalog.by_keyword("sensor");
assert_eq!(results.len(), 1);
}
}