use crate::{
aspect_map::AspectMap,
connection::{
net_connection::{NetHandler, NetWorker},
NetResult,
},
p2p_network::Lib3hClientProtocolWrapped,
};
use failure::_core::time::Duration;
use holochain_conductor_lib_api::{ConductorApi, CryptoMethod};
use holochain_json_api::{error::JsonError, json::JsonString};
use holochain_metrics::{DefaultMetricPublisher, MetricPublisher};
use in_stream::*;
use lib3h_protocol::{
data_types::{
EntryListData, FetchEntryData, GenericResultData, Opaque, SpaceData, StoreEntryAspectData,
},
protocol::*,
protocol_client::Lib3hClientProtocol,
protocol_server::Lib3hServerProtocol,
types::{AgentPubKey, SpaceHash},
uri::Lib3hUri,
Address,
};
use log::*;
use sim2h::{
crypto::{Provenance, SignedWireMessage},
generate_ack_receipt_hash, TcpWss, WireError, WireMessage, WIRE_VERSION,
};
use std::{convert::TryFrom, time::Instant};
use url::Url;
use url2::prelude::*;
const INITIAL_CONNECTION_TIMEOUT_MS: u64 = 2000; const MAX_CONNECTION_TIMEOUT_MS: u64 = 60000;
const SIM2H_WORKER_INTERNAL_REQUEST_ID: &str = "SIM2H_WORKER";
const RESEND_WIRE_MESSAGE_MS: u64 = 10000;
const BATCHING_INTERVAL_MS: u64 = 1000;
fn connect(url: Lib3hUri, timeout_ms: u64) -> NetResult<TcpWss> {
let config = WssConnectConfig::new(TlsConnectConfig::new(TcpConnectConfig {
connect_timeout_ms: Some(timeout_ms),
}));
Ok(InStreamWss::connect(&url::Url::from(url).into(), config)?)
}
#[derive(Deserialize, Serialize, Clone, Debug, DefaultJson, PartialEq)]
pub struct Sim2hConfig {
pub sim2h_url: String,
}
#[derive(Debug)]
struct BufferedMessage {
pub wire_message: WireMessage,
pub hash: u64,
pub last_sent: Option<Instant>,
}
impl From<WireMessage> for BufferedMessage {
fn from(wire_message: WireMessage) -> BufferedMessage {
BufferedMessage {
wire_message,
hash: 0,
last_sent: None,
}
}
}
#[allow(non_snake_case, dead_code)]
pub struct Sim2hWorker {
handler: NetHandler,
connection: Option<TcpWss>,
inbox: Vec<ht::EncodedSpanWrap<Lib3hClientProtocol>>,
to_core: Vec<ht::EncodedSpanWrap<Lib3hServerProtocol>>,
server_url: Lib3hUri,
space_data: Option<SpaceData>,
agent_id: Address,
conductor_api: ConductorApi,
time_of_last_connection_attempt: Instant,
connection_timeout_backoff: u64,
reconnect_interval: Duration,
metric_publisher: std::sync::Arc<std::sync::RwLock<dyn MetricPublisher>>,
time_of_last_batching: Instant,
outgoing_fetch_entry_results: Vec<ht::EncodedSpanWrap<Lib3hToClientResponse>>,
outgoing_fat_acks: AspectMap,
outgoing_message_buffer: Vec<BufferedMessage>,
ws_frame: Option<WsFrame>,
initial_authoring_list: Option<EntryListData>,
initial_gossiping_list: Option<EntryListData>,
has_self_stored_authored_aspects: bool,
is_full_sync_DHT: bool,
tracer: Option<ht::Tracer>,
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_NET)]
impl Sim2hWorker {
pub fn advertise(self) -> url::Url {
Url::parse("ws://example.com").unwrap()
}
pub fn new(
handler: NetHandler,
config: Sim2hConfig,
agent_id: Address,
conductor_api: ConductorApi,
tracer: Option<ht::Tracer>,
) -> NetResult<Self> {
let reconnect_interval = Duration::from_millis(INITIAL_CONNECTION_TIMEOUT_MS);
let mut instance = Self {
handler,
connection: None,
inbox: Vec::new(),
to_core: Vec::new(),
server_url: url::Url::from(url2!("{}", config.sim2h_url)).into(),
space_data: None,
agent_id,
conductor_api,
connection_timeout_backoff: INITIAL_CONNECTION_TIMEOUT_MS,
reconnect_interval,
time_of_last_connection_attempt: Instant::now()
.checked_sub(reconnect_interval)
.unwrap(),
metric_publisher: std::sync::Arc::new(std::sync::RwLock::new(
DefaultMetricPublisher::default(),
)),
time_of_last_batching: Instant::now(),
outgoing_fetch_entry_results: Vec::new(),
outgoing_fat_acks: AspectMap::new(),
outgoing_message_buffer: Vec::new(),
ws_frame: None,
initial_authoring_list: None,
initial_gossiping_list: None,
has_self_stored_authored_aspects: false,
is_full_sync_DHT: false,
tracer,
};
instance.send_wire_message(WireMessage::Hello(WIRE_VERSION))?;
instance.check_reconnect();
Ok(instance)
}
fn backoff(&mut self) {
let new_backoff = std::cmp::max(
MAX_CONNECTION_TIMEOUT_MS,
self.connection_timeout_backoff * 2,
);
if self.connection_timeout_backoff != new_backoff {
self.inner_set_backoff(self.connection_timeout_backoff * 2);
}
}
fn inner_set_backoff(&mut self, backoff: u64) {
self.connection_timeout_backoff = backoff;
debug!(
"BACKOFF setting reconnect interval to {}",
self.connection_timeout_backoff
);
self.reconnect_interval = Duration::from_millis(self.connection_timeout_backoff)
}
fn reset_backoff(&mut self) {
if self.connection_timeout_backoff > INITIAL_CONNECTION_TIMEOUT_MS {
self.inner_set_backoff(INITIAL_CONNECTION_TIMEOUT_MS);
}
}
fn check_reconnect(&mut self) {
if self.connection_ready() {
self.reset_backoff();
return;
}
if self.time_of_last_connection_attempt.elapsed() < self.reconnect_interval {
return;
}
warn!(
"BACKOFF attempting reconnect, connection state: {:?}",
self.connection
);
self.backoff();
self.time_of_last_connection_attempt = Instant::now();
self.connection = None;
if let Ok(connection) = connect(self.server_url.clone(), self.connection_timeout_backoff) {
let mut span: ht::Span = self
.tracer
.clone()
.unwrap_or_else(|| ht::null_tracer())
.span(format!("Sending Join {}:{}", file!(), line!()))
.start()
.into();
self.connection = Some(connection);
let msg = match &self.space_data {
None => return,
Some(space_data) => {
span.event(format!("Space Data {:?}", &space_data));
WireMessage::ClientToLib3h(
span.wrap(ClientToLib3h::JoinSpace(space_data.clone()))
.into(),
)
}
};
debug!("SENDING JOIN {:#?}", msg);
self.prepend_wire_message(msg)
.expect("can send JoinSpace on reconnect");
}
}
fn connection_ready(&mut self) -> bool {
match &mut self.connection {
Some(c) => match c.check_ready() {
Ok(true) => true,
Ok(false) => false,
Err(e) => {
error!("connection handshake error: {:?}", e);
self.connection = None;
false
}
},
_ => false,
}
}
fn check_batched_messages(&mut self) {
if self.time_of_last_batching.elapsed() < Duration::from_millis(BATCHING_INTERVAL_MS) {
return;
}
self.time_of_last_batching = Instant::now();
let mut msgs: Vec<ht::EncodedSpanWrap<Lib3hToClientResponse>> = Vec::new();
if !self.outgoing_fat_acks.empty() {
if let Some(space_data) = &self.space_data {
let entry_data = EntryListData {
space_address: space_data.space_address.clone(),
provider_agent_id: space_data.agent_id.clone(),
request_id: "".to_string(),
address_map: self.outgoing_fat_acks.clone().into(),
};
self.outgoing_fat_acks = AspectMap::new();
let msg = Lib3hToClientResponse::HandleGetGossipingEntryListResult(entry_data);
let span = ht::top_follower("fat-acks");
let msg = span.wrap(msg);
msgs.push(msg.into());
};
}
if self.outgoing_fetch_entry_results.len() > 0 {
for m in self.outgoing_fetch_entry_results.drain(..) {
msgs.push(m);
}
}
if msgs.len() > 0 {
self.outgoing_message_buffer
.push(WireMessage::MultiSendResponse(msgs).into());
}
}
fn try_send_from_outgoing_buffer(&mut self) -> bool {
if self.outgoing_message_buffer.is_empty() || !self.connection_ready() {
return false;
}
let buffered_message = self.outgoing_message_buffer.get_mut(0).unwrap();
if let Some(instant_last_sent) = buffered_message.last_sent {
if instant_last_sent.elapsed() < Duration::from_millis(RESEND_WIRE_MESSAGE_MS) {
return false;
}
}
let message = &buffered_message.wire_message;
debug!(
"WireMessage: preparing to send {:?} with hash: {}",
message, buffered_message.hash
);
let payload: String = message.clone().into();
let signature = match message {
WireMessage::ClientToLib3h(ht::EncodedSpanWrap {
data: ClientToLib3h::JoinSpace(_),
..
}) => {
let maybe_signature = self
.conductor_api
.execute(payload.clone(), CryptoMethod::Sign);
match maybe_signature {
Err(e) => {
error!(
"Couldn't sign wire message in sim2h worker: payload={}, error={:?}",
payload, e
);
return false;
}
Ok(sig) => sig,
}
}
_ => "null".to_string(), };
let payload: Opaque = payload.into();
let signed_wire_message = SignedWireMessage::new(
payload.clone(),
Provenance::new(self.agent_id.clone(), signature.into()),
);
let to_send: Opaque = signed_wire_message.into();
if let Err(e) = self
.connection
.as_mut()
.unwrap()
.write(to_send.to_vec().into())
{
error!(
"TransportError trying to send message to sim2h server: {:?}",
e
);
self.connection = None;
self.check_reconnect();
return true;
}
buffered_message.hash = generate_ack_receipt_hash(&payload);
buffered_message.last_sent = Some(Instant::now());
true
}
fn prepend_wire_message(&mut self, message: WireMessage) -> NetResult<()> {
debug!("WireMessage: queueing {:?}", message);
for buffered_message in self.outgoing_message_buffer.iter_mut() {
buffered_message.last_sent = None;
}
self.outgoing_message_buffer
.retain(|m| match &m.wire_message {
WireMessage::ClientToLib3hResponse(span_wrap) => match span_wrap.data {
ClientToLib3hResponse::SendDirectMessageResult(_) => false,
_ => true,
},
WireMessage::ClientToLib3h(span_wrap) => match span_wrap.data {
ClientToLib3h::SendDirectMessage(_) => false,
_ => true,
},
_ => true,
});
self.outgoing_message_buffer.insert(0, message.into());
Ok(())
}
fn send_wire_message(&mut self, message: WireMessage) -> NetResult<()> {
let send = match message {
WireMessage::Lib3hToClientResponse(ref span_wrap) => match span_wrap.data {
Lib3hToClientResponse::HandleFetchEntryResult(_) => {
self.outgoing_fetch_entry_results.push(span_wrap.clone());
false
}
Lib3hToClientResponse::HandleGetGossipingEntryListResult(ref entry_data) => {
if entry_data.request_id == "" {
debug!(
"WireMessage: batching fat ack, pending ack aspect count is: {}",
self.outgoing_fat_acks
.bare()
.iter()
.fold(0, |acc, (_k, v)| acc + v.len()),
);
let am = AspectMap::from(&entry_data.address_map);
self.outgoing_fat_acks = AspectMap::merge(&self.outgoing_fat_acks, &am);
false
} else {
true
}
}
_ => true,
},
_ => true,
};
if send {
debug!(
"WireMessage: queueing on top of {} messages: {:#?}\nwaiting for: {:#?}",
self.outgoing_message_buffer.len(),
message,
self.outgoing_message_buffer.get(0)
);
self.outgoing_message_buffer.push(message.into());
};
Ok(())
}
#[allow(dead_code)]
fn handle_client_message(&mut self, span_wrap: Lib3hClientProtocolWrapped) -> NetResult<()> {
match span_wrap.data.clone() {
Lib3hClientProtocol::SuccessResult(generic_result_data) => {
self.to_core.push(
span_wrap.swapped(Lib3hServerProtocol::FailureResult(generic_result_data)),
);
Ok(())
}
Lib3hClientProtocol::Connect(connect_data) => {
let msg = Lib3hServerProtocol::FailureResult(GenericResultData {
request_id: connect_data.request_id,
space_address: SpaceHash::default().into(),
to_agent_id: AgentPubKey::default(),
result_info: Opaque::new(),
});
self.to_core.push(span_wrap.swapped(msg));
Ok(())
}
Lib3hClientProtocol::JoinSpace(space_data) => {
self.space_data = Some(space_data.clone());
self.send_wire_message(WireMessage::ClientToLib3h(
span_wrap.swapped(ClientToLib3h::JoinSpace(space_data)),
))
}
Lib3hClientProtocol::LeaveSpace(space_data) => {
self.send_wire_message(WireMessage::ClientToLib3h(
span_wrap.swapped(ClientToLib3h::LeaveSpace(space_data)),
))
}
Lib3hClientProtocol::SendDirectMessage(dm_data) => {
self.send_wire_message(WireMessage::ClientToLib3h(
span_wrap.swapped(ClientToLib3h::SendDirectMessage(dm_data)),
))
}
Lib3hClientProtocol::HandleSendDirectMessageResult(dm_data) => {
self.send_wire_message(WireMessage::Lib3hToClientResponse(span_wrap.swapped(
Lib3hToClientResponse::HandleSendDirectMessageResult(dm_data),
)))
}
Lib3hClientProtocol::FetchEntry(_fetch_entry_data) => {
panic!("FetchEntry send by core - this should never happen");
}
Lib3hClientProtocol::HandleFetchEntryResult(fetch_entry_result_data) => {
if fetch_entry_result_data.request_id == SIM2H_WORKER_INTERNAL_REQUEST_ID {
for aspect in fetch_entry_result_data.entry.aspect_list {
let msg =
Lib3hServerProtocol::HandleStoreEntryAspect(StoreEntryAspectData {
request_id: "".into(),
space_address: fetch_entry_result_data.space_address.clone(),
provider_agent_id: fetch_entry_result_data
.provider_agent_id
.clone(),
entry_address: fetch_entry_result_data.entry.entry_address.clone(),
entry_aspect: aspect,
});
self.to_core.push(span_wrap.swapped(msg));
}
Ok(())
} else {
self.send_wire_message(WireMessage::Lib3hToClientResponse(span_wrap.swapped(
Lib3hToClientResponse::HandleFetchEntryResult(fetch_entry_result_data),
)))
}
}
Lib3hClientProtocol::PublishEntry(provided_entry_data) => {
for aspect in &provided_entry_data.entry.aspect_list {
let msg = Lib3hServerProtocol::HandleStoreEntryAspect(StoreEntryAspectData {
request_id: "".into(),
space_address: provided_entry_data.space_address.clone(),
provider_agent_id: provided_entry_data.provider_agent_id.clone(),
entry_address: provided_entry_data.entry.entry_address.clone(),
entry_aspect: aspect.clone(),
});
self.to_core.push(span_wrap.swapped(msg));
}
self.send_wire_message(WireMessage::ClientToLib3h(
span_wrap.swapped(ClientToLib3h::PublishEntry(provided_entry_data)),
))
}
Lib3hClientProtocol::QueryEntry(query_entry_data) => {
if self.is_autonomous_node() {
let msg = Lib3hServerProtocol::HandleQueryEntry(query_entry_data);
self.to_core.push(span_wrap.swapped(msg));
Ok(())
} else {
self.send_wire_message(WireMessage::ClientToLib3h(
span_wrap.swapped(ClientToLib3h::QueryEntry(query_entry_data)),
))
}
}
Lib3hClientProtocol::HandleQueryEntryResult(query_entry_result_data) => {
if self.is_autonomous_node() {
let msg = Lib3hServerProtocol::QueryEntryResult(query_entry_result_data);
self.to_core.push(span_wrap.swapped(msg));
Ok(())
} else {
self.send_wire_message(WireMessage::Lib3hToClientResponse(span_wrap.swapped(
Lib3hToClientResponse::HandleQueryEntryResult(query_entry_result_data),
)))
}
}
Lib3hClientProtocol::HandleGetAuthoringEntryListResult(entry_list_data) => {
self.initial_authoring_list = Some(entry_list_data.clone());
if self.is_autonomous_node() {
self.self_store_authored_aspects();
}
self.send_wire_message(WireMessage::Lib3hToClientResponse(span_wrap.swapped(
Lib3hToClientResponse::HandleGetAuthoringEntryListResult(entry_list_data),
)))
}
Lib3hClientProtocol::HandleGetGossipingEntryListResult(entry_list_data) => {
self.initial_gossiping_list = Some(entry_list_data.clone());
if self.is_autonomous_node() {
self.self_store_authored_aspects();
}
self.send_wire_message(WireMessage::Lib3hToClientResponse(span_wrap.swapped(
Lib3hToClientResponse::HandleGetGossipingEntryListResult(entry_list_data),
)))
}
Lib3hClientProtocol::Shutdown => {
debug!("Got Lib3hClientProtocol::Shutdown from core in sim2h worker");
Ok(())
}
}
}
#[autotrace]
fn self_store_authored_aspects(&mut self) {
if !self.has_self_stored_authored_aspects
&& self.initial_gossiping_list.is_some()
&& self.initial_authoring_list.is_some()
{
let authoring_list = self.initial_authoring_list.take().unwrap();
let gossiping_list = self.initial_gossiping_list.take().unwrap();
for (entry_hash, aspect_hashes) in &authoring_list.address_map {
if let Some(gossiping_aspects) = gossiping_list.address_map.get(entry_hash) {
let mut authoring_aspects = aspect_hashes.clone();
for aspect in gossiping_aspects {
authoring_aspects.remove_item(aspect);
}
if authoring_aspects.is_empty() {
continue;
}
}
let msg = Lib3hServerProtocol::HandleFetchEntry(FetchEntryData {
space_address: authoring_list.space_address.clone(),
entry_address: entry_hash.clone(),
request_id: SIM2H_WORKER_INTERNAL_REQUEST_ID.to_string(),
provider_agent_id: authoring_list.provider_agent_id.clone(),
aspect_address_list: Some(aspect_hashes.clone()),
});
let span = ht::top_follower("pre-send");
self.to_core.push(span.wrap(msg).into())
}
self.has_self_stored_authored_aspects = true;
}
}
fn handle_server_message(&mut self, message: WireMessage) -> NetResult<()> {
let span = ht::with_top_or_null(|s| s.child("handle_server_message"));
match message {
WireMessage::Ping => self.send_wire_message(WireMessage::Pong)?,
WireMessage::Pong => {}
WireMessage::Lib3hToClient(span_wrap) => self.to_core.push(span_wrap.map(|m| Lib3hServerProtocol::from(m))),
WireMessage::MultiSend(messages) => {
for span_wrap in messages.into_iter() {
self.to_core.push(span_wrap.map(Lib3hServerProtocol::from));
}
}
WireMessage::MultiSendResponse(m) => {
error!(
"Got a MultiSendResponse from the Sim2h server, weird! Ignoring: {:?}",
m
)
}
WireMessage::ClientToLib3hResponse(span_wrap) => {
self.to_core.push(span_wrap.map(Lib3hServerProtocol::from))
}
WireMessage::Lib3hToClientResponse(m) => error!(
"Got a Lib3hToClientResponse from the Sim2h server, weird! Ignoring: {:?}",
m
),
WireMessage::ClientToLib3h(m) => error!(
"Got a ClientToLib3h from the Sim2h server, weird! Ignoring: {:?}",
m
),
WireMessage::Err(sim2h_error) => match sim2h_error {
WireError::MessageWhileInLimbo => {
if let Some(space_data) = self.space_data.clone() {
self.send_wire_message(WireMessage::ClientToLib3h(
span.wrap(ClientToLib3h::JoinSpace(space_data)).into(),
))?;
} else {
error!("Uh oh, we got a MessageWhileInLimbo errro and we don't have space data. Did core send a message before sending a join? This should not happen.");
}
}
WireError::Other(e) => error!("Got error from Sim2h server: {:?}", e),
},
WireMessage::Status => error!("Got a Status from the Sim2h server, weird! Ignoring"),
WireMessage::Debug => error!("Got a Debug from the Sim2h server, weird! Ignoring"),
WireMessage::DebugResponse(_) => error!("Got a DebugResponse from the Sim2h server, weird! Ignoring"),
WireMessage::Hello(_) => error!("Got a Hello from the Sim2h server, weird! Ignoring"),
WireMessage::HelloResponse(response) => {
if WIRE_VERSION != response.version {
panic!("holochain SIM2H WIRE_VERSION ({}) does not match SIM2H server WIRE_VERSION ({}) - cannot continue", WIRE_VERSION, response.version);
}
debug!("HelloResponse {:?}", response);
self.set_full_sync(response.redundant_count == 0);
}
WireMessage::StatusResponse(_) => error!("Got a StatusResponse from the Sim2h server, weird! Ignoring (I use Hello not Status)"),
WireMessage::Ack(hash) => {
if self.outgoing_message_buffer
.first()
.and_then(|buffered_message| Some(buffered_message.hash == hash))
.unwrap_or(false)
{
debug!("WireMessage::Ack received => dequeuing sent message {:?}", message);
self.outgoing_message_buffer.remove(0);
} else {
warn!(
"WireMessage::Ack received that came out of order! Got hash: {}, have top hash: {:?}",
hash,
self.outgoing_message_buffer
.first()
.map(|buffered_message| buffered_message.hash)
);
}
}
};
Ok(())
}
pub fn set_full_sync(&mut self, full_sync: bool) {
self.is_full_sync_DHT = full_sync;
}
fn is_autonomous_node(&mut self) -> bool {
self.is_full_sync_DHT || !self.connection_ready()
}
#[allow(dead_code)]
fn send_ping(&mut self) {
trace!("Ping");
if let Err(e) = self.send_wire_message(WireMessage::Ping) {
debug!("Ping failed with: {:?}", e);
}
}
pub fn test_close_connection_cause_reconnect(&mut self) {
self.connection = None;
self.time_of_last_connection_attempt = std::time::Instant::now()
.checked_sub(self.reconnect_interval * 2)
.unwrap();
}
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_NET)]
impl NetWorker for Sim2hWorker {
fn receive(&mut self, data: Lib3hClientProtocolWrapped) -> NetResult<()> {
self.inbox.push(data);
Ok(())
}
fn tick(&mut self) -> NetResult<bool> {
let clock = std::time::SystemTime::now();
let mut did_something = false;
if self.ws_frame.is_none() {
self.ws_frame = Some(WsFrame::default());
}
if self.connection_ready() {
self.reset_backoff();
self.check_batched_messages();
if self.try_send_from_outgoing_buffer() {
did_something = true;
}
match self
.connection
.as_mut()
.unwrap()
.read(&mut self.ws_frame.as_mut().unwrap())
{
Ok(_) => {
did_something = true;
let frame = self.ws_frame.take().unwrap();
match frame {
WsFrame::Binary(payload) => {
let payload: Opaque = payload.into();
match WireMessage::try_from(&payload) {
Ok(wire_message) =>
if let Err(error) = self.handle_server_message(wire_message) {
error!("Error handling server message in Sim2hWorker: {:?}", error);
},
Err(error) =>
error!(
"Could not deserialize received payload into WireMessage!\nError: {:?}\nPayload was: {:?}",
error,
payload
)
}
}
WsFrame::Ping(_) => (),
frame => {
trace!("unhandled websocket message type: {:?}", frame);
}
}
}
Err(e) if e.would_block() => (),
Err(e) => {
error!(
"TransportError trying to read message from sim2h server: {:?}",
e
);
self.connection = None;
self.check_reconnect();
}
}
} else {
self.check_reconnect();
}
let client_messages = self.inbox.drain(..).collect::<Vec<_>>();
for data in client_messages {
debug!("CORE >> Sim2h: {:?}", data);
if let Err(error) = self.handle_client_message(data) {
error!("Error handling client message in Sim2hWorker: {:?}", error);
}
did_something = true;
}
let server_messages = self.to_core.drain(..).collect::<Vec<_>>();
for data in server_messages {
debug!("Sim2h >> CORE: {:?}", data);
if let Err(error) = self.handler.handle(Ok(data)) {
error!(
"Error handling server message in core's handler: {:?}",
error
);
}
did_something = true;
}
if did_something {
let latency = clock.elapsed().unwrap().as_millis();
let metric_name = "sim2h_worker.tick.latency";
let metric = holochain_metrics::Metric::new(
metric_name,
None,
Some(clock.into()),
latency as f64,
);
self.metric_publisher.write().unwrap().publish(&metric);
}
Ok(did_something)
}
fn p2p_endpoint(&self) -> Option<url::Url> {
Some(self.server_url.clone().into())
}
fn endpoint(&self) -> Option<String> {
Some("".into())
}
}