#[cfg(feature = "nats-transport")]
use serde::{de::DeserializeOwned, Serialize};
const PREFIX: &str = "varpulis.cluster";
pub fn subject_register() -> String {
format!("{PREFIX}.register")
}
pub fn subject_heartbeat(worker_id: &str) -> String {
format!("{PREFIX}.heartbeat.{worker_id}")
}
pub fn subject_cmd(worker_id: &str, cmd: &str) -> String {
format!("{PREFIX}.cmd.{worker_id}.{cmd}")
}
pub fn subject_cmd_wildcard(worker_id: &str) -> String {
format!("{PREFIX}.cmd.{worker_id}.>")
}
pub fn subject_pipeline(group: &str, from: &str, to: &str) -> String {
format!("{PREFIX}.pipeline.{group}.{from}.{to}")
}
pub fn subject_raft(node_id: u64, rpc: &str) -> String {
format!("{PREFIX}.raft.{node_id}.{rpc}")
}
pub fn subject_raft_wildcard(node_id: u64) -> String {
format!("{PREFIX}.raft.{node_id}.>")
}
const FEDERATION_PREFIX: &str = "varpulis.federation";
pub fn subject_federation_heartbeat(region: &str) -> String {
format!("{FEDERATION_PREFIX}.heartbeat.{region}")
}
pub fn subject_federation_catalog(region: &str) -> String {
format!("{FEDERATION_PREFIX}.catalog.{region}")
}
pub fn subject_federation_route(from_region: &str, to_region: &str) -> String {
format!("{FEDERATION_PREFIX}.route.{from_region}.{to_region}")
}
pub fn subject_federation_wildcard() -> String {
format!("{FEDERATION_PREFIX}.>")
}
#[cfg(feature = "nats-transport")]
pub async fn connect_nats(url: &str) -> Result<async_nats::Client, async_nats::ConnectError> {
async_nats::connect(url).await
}
#[cfg(feature = "nats-transport")]
pub async fn nats_request<Req: Serialize, Resp: DeserializeOwned>(
client: &async_nats::Client,
subject: &str,
payload: &Req,
timeout: std::time::Duration,
) -> Result<Resp, NatsTransportError> {
let bytes = serde_json::to_vec(payload).map_err(NatsTransportError::Serialize)?;
let resp = tokio::time::timeout(timeout, client.request(subject.to_string(), bytes.into()))
.await
.map_err(|_| NatsTransportError::Timeout)?
.map_err(NatsTransportError::Request)?;
serde_json::from_slice(&resp.payload).map_err(NatsTransportError::Deserialize)
}
#[cfg(feature = "nats-transport")]
pub async fn nats_publish<T: Serialize>(
client: &async_nats::Client,
subject: &str,
payload: &T,
) -> Result<(), NatsTransportError> {
let bytes = serde_json::to_vec(payload).map_err(NatsTransportError::Serialize)?;
#[cfg(feature = "otel")]
{
use opentelemetry::trace::TraceContextExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::Span::current();
let context = span.context();
let span_ref = context.span();
let span_context = span_ref.span_context();
if span_context.is_valid() {
let traceparent = format!(
"00-{}-{}-{:02x}",
span_context.trace_id(),
span_context.span_id(),
span_context.trace_flags().to_u8(),
);
let mut headers = async_nats::HeaderMap::new();
headers.insert("traceparent", traceparent.as_str());
return client
.publish_with_headers(subject.to_string(), headers, bytes.into())
.await
.map_err(NatsTransportError::Publish);
}
}
client
.publish(subject.to_string(), bytes.into())
.await
.map_err(NatsTransportError::Publish)
}
#[cfg(feature = "nats-transport")]
#[derive(Debug, thiserror::Error)]
pub enum NatsTransportError {
#[error("serialization failed: {0}")]
Serialize(serde_json::Error),
#[error("deserialization failed: {0}")]
Deserialize(serde_json::Error),
#[error("NATS request failed: {0}")]
Request(async_nats::RequestError),
#[error("NATS publish failed: {0}")]
Publish(async_nats::PublishError),
#[error("request timed out")]
Timeout,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subject_register() {
assert_eq!(subject_register(), "varpulis.cluster.register");
}
#[test]
fn test_subject_heartbeat() {
assert_eq!(subject_heartbeat("w0"), "varpulis.cluster.heartbeat.w0");
}
#[test]
fn test_subject_cmd() {
assert_eq!(
subject_cmd("w1", "deploy"),
"varpulis.cluster.cmd.w1.deploy"
);
assert_eq!(
subject_cmd("w1", "inject"),
"varpulis.cluster.cmd.w1.inject"
);
}
#[test]
fn test_subject_cmd_wildcard() {
assert_eq!(subject_cmd_wildcard("w1"), "varpulis.cluster.cmd.w1.>");
}
#[test]
fn test_subject_pipeline() {
assert_eq!(
subject_pipeline("mandelbrot", "ingress", "row0"),
"varpulis.cluster.pipeline.mandelbrot.ingress.row0"
);
}
#[test]
fn test_subject_raft() {
assert_eq!(subject_raft(1, "vote"), "varpulis.cluster.raft.1.vote");
assert_eq!(subject_raft(3, "append"), "varpulis.cluster.raft.3.append");
}
#[test]
fn test_subject_raft_wildcard() {
assert_eq!(subject_raft_wildcard(2), "varpulis.cluster.raft.2.>");
}
}