use crate::config::UrlVar;
use crate::logs::default_values::*;
use crate::logs::env_variables::*;
use crate::logs::ExportingEnabled;
use crate::{ApiError, CliState, TransportRouteResolver};
use crate::{DefaultAddress, Result};
use ockam::identity::{get_default_timeout, Identifier, SecureClient, TrustIdentifierPolicy};
use ockam_core::env::{get_env_with_default, FromString};
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use ockam_transport_tcp::TcpTransport;
use std::fmt::{Debug, Display, Formatter};
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tokio_retry::strategy::FixedInterval;
use url::Url;
#[derive(Debug, Clone)]
pub struct ExportingConfiguration {
enabled: ExportingEnabled,
span_export_timeout: Duration,
log_export_timeout: Duration,
span_export_scheduled_delay: Duration,
log_export_scheduled_delay: Duration,
span_export_queue_size: u16,
log_export_queue_size: u16,
opentelemetry_endpoint: TelemetryEndpoint,
is_ockam_developer: bool,
span_export_cutoff: Option<Duration>,
log_export_cutoff: Option<Duration>,
}
impl ExportingConfiguration {
pub fn is_enabled(&self) -> bool {
self.enabled == ExportingEnabled::On
}
pub fn is_ockam_developer(&self) -> bool {
self.is_ockam_developer
}
pub fn log_export_timeout(&self) -> Duration {
self.log_export_timeout
}
pub fn log_export_scheduled_delay(&self) -> Duration {
self.log_export_scheduled_delay
}
pub fn span_export_timeout(&self) -> Duration {
self.span_export_timeout
}
pub fn span_export_scheduled_delay(&self) -> Duration {
self.span_export_scheduled_delay
}
pub fn span_export_queue_size(&self) -> u16 {
self.span_export_queue_size
}
pub fn log_export_queue_size(&self) -> u16 {
self.log_export_queue_size
}
pub fn span_export_cutoff(&self) -> Option<Duration> {
self.span_export_cutoff
}
pub fn log_export_cutoff(&self) -> Option<Duration> {
self.log_export_cutoff
}
pub fn opentelemetry_endpoint(&self) -> TelemetryEndpoint {
self.opentelemetry_endpoint.clone()
}
pub async fn foreground(state: &CliState, ctx: &Context) -> Result<ExportingConfiguration> {
match opentelemetry_endpoint(state, ctx).await? {
None => ExportingConfiguration::off(),
Some(endpoint) => {
let enabled = exporting_enabled(
&endpoint,
ctx,
opentelemetry_endpoint_foreground_connection_timeout()?,
)
.await?;
Self::make_foreground_exporting_configuration(endpoint, enabled)
}
}
}
pub async fn background(state: &CliState, ctx: &Context) -> Result<ExportingConfiguration> {
match opentelemetry_endpoint(state, ctx).await? {
None => ExportingConfiguration::off(),
Some(endpoint) => {
let enabled = exporting_enabled(
&endpoint,
ctx,
opentelemetry_endpoint_background_connection_timeout()?,
)
.await?;
Self::make_background_exporting_configuration(endpoint, enabled)
}
}
}
pub fn off() -> Result<ExportingConfiguration> {
Ok(ExportingConfiguration {
enabled: ExportingEnabled::Off,
span_export_timeout: DEFAULT_EXPORT_TIMEOUT,
log_export_timeout: DEFAULT_EXPORT_TIMEOUT,
span_export_scheduled_delay: DEFAULT_FOREGROUND_EXPORT_SCHEDULED_DELAY,
log_export_scheduled_delay: DEFAULT_FOREGROUND_EXPORT_SCHEDULED_DELAY,
span_export_queue_size: DEFAULT_SPAN_EXPORT_QUEUE_SIZE,
log_export_queue_size: DEFAULT_LOG_EXPORT_QUEUE_SIZE,
opentelemetry_endpoint: Self::default_telemetry_endpoint()?,
is_ockam_developer: is_ockam_developer()?,
span_export_cutoff: None,
log_export_cutoff: None,
})
}
pub fn make_foreground_exporting_configuration(
endpoint: TelemetryEndpoint,
enabled: ExportingEnabled,
) -> Result<ExportingConfiguration> {
Ok(ExportingConfiguration {
enabled,
span_export_timeout: span_export_timeout()?,
log_export_timeout: log_export_timeout()?,
span_export_scheduled_delay: foreground_span_export_scheduled_delay()?,
log_export_scheduled_delay: foreground_log_export_scheduled_delay()?,
span_export_queue_size: span_export_queue_size()?,
log_export_queue_size: log_export_queue_size()?,
opentelemetry_endpoint: endpoint,
is_ockam_developer: is_ockam_developer()?,
span_export_cutoff: Some(foreground_span_export_portal_cutoff()?),
log_export_cutoff: Some(foreground_log_export_cutoff()?),
})
}
pub fn make_background_exporting_configuration(
endpoint: TelemetryEndpoint,
enabled: ExportingEnabled,
) -> Result<ExportingConfiguration> {
Ok(ExportingConfiguration {
enabled,
span_export_timeout: span_export_timeout()?,
log_export_timeout: log_export_timeout()?,
span_export_scheduled_delay: background_span_export_scheduled_delay()?,
log_export_scheduled_delay: background_log_export_scheduled_delay()?,
span_export_queue_size: span_export_queue_size()?,
log_export_queue_size: log_export_queue_size()?,
opentelemetry_endpoint: endpoint,
is_ockam_developer: is_ockam_developer()?,
span_export_cutoff: Some(background_span_export_portal_cutoff()?),
log_export_cutoff: Some(background_log_export_cutoff()?),
})
}
fn default_telemetry_endpoint() -> Result<TelemetryEndpoint> {
get_https_endpoint()
}
}
impl Display for ExportingConfiguration {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("tracing")
.field("enabled", &self.enabled.to_string())
.finish()
}
}
#[derive(Clone)]
pub enum TelemetryEndpoint {
SecureChannelEndpoint(SecureClient, String),
HttpsEndpoint(Url),
}
impl Display for TelemetryEndpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TelemetryEndpoint::SecureChannelEndpoint(client, forwarder_service_name) => f
.write_fmt(format_args!(
"{} => 0#{}",
&client.secure_route().to_string(),
forwarder_service_name
)),
TelemetryEndpoint::HttpsEndpoint(url) => f.write_str(url.as_str()),
}
}
}
impl Debug for TelemetryEndpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.to_string().as_str())
}
}
pub fn is_exporting_set() -> Result<bool> {
Ok(get_env_with_default(
OCKAM_TELEMETRY_EXPORT,
get_env_with_default(OCKAM_OPENTELEMETRY_EXPORT, true)?,
)?)
}
pub fn is_exporting_via_project_set() -> Result<bool> {
Ok(get_env_with_default(
OCKAM_TELEMETRY_EXPORT_VIA_PROJECT,
true,
)?)
}
pub fn is_exporting_via_authority_set() -> Result<bool> {
Ok(get_env_with_default(
OCKAM_TELEMETRY_EXPORT_VIA_AUTHORITY,
false,
)?)
}
pub fn telemetry_export_node_route() -> Result<Option<MultiAddr>> {
Ok(get_env_with_default(
OCKAM_TELEMETRY_EXPORT_NODE_ROUTE,
None,
)?)
}
pub fn telemetry_export_node_identifier() -> Result<Option<Identifier>> {
Ok(get_env_with_default(
OCKAM_TELEMETRY_EXPORT_NODE_IDENTIFIER,
None,
)?)
}
pub fn telemetry_export_node_forwarder_service() -> Result<String> {
Ok(get_env_with_default(
OCKAM_TELEMETRY_EXPORT_NODE_FORWARDER_SERVICE,
DefaultAddress::GRPC_FORWARDER.to_string(),
)?)
}
pub fn is_export_debug_set() -> Result<bool> {
Ok(get_env_with_default(
OCKAM_OPENTELEMETRY_EXPORT_DEBUG,
false,
)?)
}
fn print_debug(message: impl Into<String>) {
if is_export_debug_set().unwrap_or(false) {
println!("{}", message.into());
}
}
async fn exporting_enabled(
endpoint: &TelemetryEndpoint,
ctx: &Context,
connection_check_timeout: Duration,
) -> ockam_core::Result<ExportingEnabled> {
if is_endpoint_accessible(endpoint, ctx, connection_check_timeout).await {
print_debug("Exporting is enabled");
Ok(ExportingEnabled::On)
} else {
let endpoint_kind = match endpoint {
TelemetryEndpoint::HttpsEndpoint(_) => "HTTPs telemetry collector endpoint",
TelemetryEndpoint::SecureChannelEndpoint(_, _) => "Node telemetry collector endpoint",
};
print_debug(format!("Exporting telemetry events is disabled because the {} at {} cannot be reached after {}ms", endpoint_kind, endpoint, connection_check_timeout.as_millis()));
print_debug("You can disable the export of telemetry events with: `export OCKAM_TELEMETRY_EXPORT=false` to avoid this connection check.");
print_debug("Exporting is disabled");
Ok(ExportingEnabled::Off)
}
}
async fn is_endpoint_accessible(
endpoint: &TelemetryEndpoint,
ctx: &Context,
connection_check_timeout: Duration,
) -> bool {
match endpoint {
TelemetryEndpoint::SecureChannelEndpoint(client, _) => {
is_node_accessible(client, ctx, &connection_check_timeout).await
}
TelemetryEndpoint::HttpsEndpoint(url) => {
print_debug("check if the endpoint is accessible");
is_url_accessible(url, connection_check_timeout).await
}
}
}
async fn is_node_accessible(
secure_client: &SecureClient,
ctx: &Context,
connection_check_timeout: &Duration,
) -> bool {
secure_client
.clone()
.with_secure_channel_timeout(connection_check_timeout)
.check_secure_channel(ctx)
.await
.is_ok()
}
async fn is_url_accessible(url: &Url, connection_check_timeout: Duration) -> bool {
match to_socket_addr(url) {
Some(address) => {
let retries = FixedInterval::from_millis(100);
let now = Instant::now();
for timeout_duration in retries {
print_debug(format!(
"trying to connect to {address} in {timeout_duration:?}"
));
let res = tokio::time::timeout(
timeout_duration,
tokio::net::TcpStream::connect(&address),
)
.await;
if let Ok(res) = res {
if res.is_ok() {
return true;
}
};
print_debug(format!(
"elapsed: {:?}, timeout {:?}",
now.elapsed(),
connection_check_timeout
));
if now.elapsed() >= connection_check_timeout {
return false;
};
tokio::time::sleep(timeout_duration).await;
}
false
}
_ => {
print_debug(format!(
"the url {url} can not be parsed as a socket address"
));
false
}
}
}
fn to_socket_addr(url: &Url) -> Option<SocketAddr> {
match (url.host_str(), url.port()) {
(Some(host), Some(port)) => (host, port)
.to_socket_addrs()
.ok()
.and_then(|mut addrs| addrs.next()),
(Some(host), None) => (host, 443)
.to_socket_addrs()
.ok()
.and_then(|mut addrs| addrs.next()),
_ => None,
}
}
async fn opentelemetry_endpoint(
cli_state: &CliState,
ctx: &Context,
) -> Result<Option<TelemetryEndpoint>> {
let route_and_identifier = match (
telemetry_export_node_route()?,
telemetry_export_node_identifier()?,
) {
(Some(route), Some(identifier)) => Some((route, identifier)),
_ => {
if let Ok(project) = cli_state.projects().get_default_project().await {
if project.is_ready() {
let via_project = is_exporting_via_project_set()?;
let via_authority = is_exporting_via_authority_set()? && !via_project;
if via_project {
print_debug("The project node is used as a telemetry endpoint");
Some((
project.project_multiaddr()?.clone(),
project.project_identifier().ok_or(ApiError::message(
"The default project must have an identifier",
))?,
))
} else if via_authority {
print_debug("The authority node is used as a telemetry endpoint");
Some((
project.authority_multiaddr()?.clone(),
project.authority_identifier().ok_or(ApiError::message(
"The default project authority must have an identifier",
))?,
))
} else {
print_debug(
"The default project is ready but export via the project node or the authority node is disabled. Getting the default HTTPs endpoint",
);
None
}
} else {
print_debug(
"The default project is not ready. Getting the default HTTPs endpoint",
);
None
}
} else {
print_debug("There is no default project. Getting the default HTTPs endpoint");
None
}
}
};
let endpoint = if let Some((route, identifier)) = route_and_identifier {
let client = make_secure_client(cli_state, ctx, route, identifier).await?;
TelemetryEndpoint::SecureChannelEndpoint(client, telemetry_export_node_forwarder_service()?)
} else {
get_https_endpoint()?
};
print_debug(format!("Exporting telemetry data to: {endpoint}"));
Ok(Some(endpoint))
}
async fn make_secure_client(
cli_state: &CliState,
ctx: &Context,
route: MultiAddr,
identifier: Identifier,
) -> Result<SecureClient> {
let project_route = TransportRouteResolver::default()
.allow_tcp()
.resolve(&route)?;
let (secure_channels, node_identifier) = if let Ok(node) = cli_state.get_default_node().await {
(
cli_state.secure_channels_for_node(&node.name()).await?,
node.identifier(),
)
} else {
(
cli_state.secure_channels().await?,
cli_state
.get_or_create_default_named_identity()
.await?
.identifier(),
)
};
Ok(SecureClient::new(
secure_channels,
None,
TcpTransport::get_or_create(ctx)?,
project_route,
Arc::new(TrustIdentifierPolicy::new(identifier)),
&node_identifier,
get_default_timeout(),
get_default_timeout(),
))
}
pub fn get_https_endpoint() -> Result<TelemetryEndpoint> {
Ok(TelemetryEndpoint::HttpsEndpoint(get_https_endpoint_url()?))
}
pub fn get_https_endpoint_url() -> Result<Url> {
Ok(get_env_with_default::<UrlVar>(
OCKAM_OPENTELEMETRY_ENDPOINT,
UrlVar::from_string(DEFAULT_OPENTELEMETRY_ENDPOINT)?,
)?
.url)
}
fn is_ockam_developer() -> Result<bool> {
Ok(get_env_with_default(OCKAM_DEVELOPER, false)?)
}
pub fn span_export_timeout() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_SPAN_EXPORT_TIMEOUT,
DEFAULT_EXPORT_TIMEOUT,
)?)
}
fn opentelemetry_endpoint_background_connection_timeout() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_BACKGROUND_TELEMETRY_ENDPOINT_CONNECTION_TIMEOUT,
DEFAULT_TELEMETRY_ENDPOINT_BACKGROUND_CONNECTION_TIMEOUT,
)?)
}
fn opentelemetry_endpoint_foreground_connection_timeout() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_FOREGROUND_TELEMETRY_ENDPOINT_CONNECTION_TIMEOUT,
DEFAULT_TELEMETRY_ENDPOINT_FOREGROUND_CONNECTION_TIMEOUT,
)?)
}
fn foreground_span_export_scheduled_delay() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_FOREGROUND_SPAN_EXPORT_SCHEDULED_DELAY,
DEFAULT_FOREGROUND_EXPORT_SCHEDULED_DELAY,
)?)
}
fn background_span_export_scheduled_delay() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_BACKGROUND_SPAN_EXPORT_SCHEDULED_DELAY,
DEFAULT_BACKGROUND_EXPORT_SCHEDULED_DELAY,
)?)
}
fn span_export_queue_size() -> Result<u16> {
Ok(get_env_with_default(
OCKAM_SPAN_EXPORT_QUEUE_SIZE,
DEFAULT_SPAN_EXPORT_QUEUE_SIZE,
)?)
}
fn log_export_queue_size() -> Result<u16> {
Ok(get_env_with_default(
OCKAM_LOG_EXPORT_QUEUE_SIZE,
DEFAULT_LOG_EXPORT_QUEUE_SIZE,
)?)
}
pub fn log_export_timeout() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_LOG_EXPORT_TIMEOUT,
DEFAULT_EXPORT_TIMEOUT,
)?)
}
pub fn foreground_log_export_scheduled_delay() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_FOREGROUND_LOG_EXPORT_SCHEDULED_DELAY,
DEFAULT_FOREGROUND_EXPORT_SCHEDULED_DELAY,
)?)
}
pub fn background_log_export_scheduled_delay() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_BACKGROUND_LOG_EXPORT_SCHEDULED_DELAY,
DEFAULT_BACKGROUND_EXPORT_SCHEDULED_DELAY,
)?)
}
pub fn foreground_log_export_cutoff() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_FOREGROUND_LOG_EXPORT_CUTOFF,
DEFAULT_FOREGROUND_LOG_EXPORT_CUTOFF,
)?)
}
pub fn foreground_span_export_portal_cutoff() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_FOREGROUND_SPAN_EXPORT_CUTOFF,
DEFAULT_FOREGROUND_SPAN_EXPORT_CUTOFF,
)?)
}
pub fn background_log_export_cutoff() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_BACKGROUND_LOG_EXPORT_CUTOFF,
DEFAULT_BACKGROUND_LOG_EXPORT_CUTOFF,
)?)
}
pub fn background_span_export_portal_cutoff() -> Result<Duration> {
Ok(get_env_with_default(
OCKAM_BACKGROUND_SPAN_EXPORT_CUTOFF,
DEFAULT_BACKGROUND_SPAN_EXPORT_CUTOFF,
)?)
}