use crate::{
client::{MicrogridApiClient, instruction::Instruction, retry_tracker::RetryTracker},
proto::microgrid::{
ListElectricalComponentConnectionsRequest, ListElectricalComponentsRequest,
ReceiveElectricalComponentTelemetryStreamRequest,
ReceiveElectricalComponentTelemetryStreamResponse,
},
};
use chrono::DateTime;
use futures::{Stream, StreamExt};
use std::collections::HashMap;
use tokio::{
select,
sync::{broadcast, mpsc},
};
use tracing::Instrument as _;
use crate::{Error, proto::common::microgrid::electrical_components::ElectricalComponentTelemetry};
enum StreamStatus {
Failed(u64),
Connected(u64),
Ended(u64),
}
pub(super) struct MicrogridClientActor<T> {
client: T,
instructions_rx: mpsc::Receiver<Instruction>,
}
impl<T: MicrogridApiClient> MicrogridClientActor<T> {
pub(super) fn new_from_client(client: T, instructions_rx: mpsc::Receiver<Instruction>) -> Self {
Self {
client,
instructions_rx,
}
}
pub(super) async fn run(mut self) {
let mut component_streams: HashMap<u64, broadcast::Sender<ElectricalComponentTelemetry>> =
HashMap::new();
let (stream_status_tx, mut stream_status_rx) = mpsc::channel(50);
let mut retry_timer = tokio::time::interval(std::time::Duration::from_secs(1));
retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut components_to_retry = HashMap::new();
loop {
select! {
instruction = self.instructions_rx.recv() => {
if let Err(e) = handle_instruction(
&mut self.client,
&mut component_streams,
instruction,
stream_status_tx.clone(),
).await {
tracing::error!("MicrogridClientActor: Error handling instruction: {e}");
}
}
stream_status = stream_status_rx.recv() => {
match stream_status {
Some(StreamStatus::Failed(component_id)) => {
components_to_retry.entry(component_id).or_insert_with(
RetryTracker::new
).mark_new_failure();
}
Some(StreamStatus::Connected(component_id)) => {
components_to_retry.remove(&component_id);
}
Some(StreamStatus::Ended(component_id)) => {
components_to_retry.remove(&component_id);
}
None => {
tracing::error!("MicrogridClientActor: Stream status channel closed, exiting.");
return;
}
}
}
now = retry_timer.tick() => {
if let Err(e) = handle_retry_timer(
&mut self.client,
&mut component_streams,
&mut components_to_retry,
stream_status_tx.clone(),
now,
).await {
tracing::error!("MicrogridClientActor: Error handling retry timer: {e}");
}
}
}
}
}
}
async fn handle_instruction<T: MicrogridApiClient>(
client: &mut T,
component_streams: &mut HashMap<u64, broadcast::Sender<ElectricalComponentTelemetry>>,
instruction: Option<Instruction>,
stream_status_tx: mpsc::Sender<StreamStatus>,
) -> Result<(), Error> {
match instruction {
Some(Instruction::ReceiveElectricalComponentTelemetryStream {
electrical_component_id,
response_tx,
}) => {
if let Some(stream) = component_streams.get(&electrical_component_id) {
response_tx
.send(stream.subscribe())
.map_err(|_| Error::internal("failed to send response"))?;
return Ok(());
}
let (tx, rx) = broadcast::channel::<ElectricalComponentTelemetry>(100);
component_streams.insert(electrical_component_id, tx.clone());
start_electrical_component_telemetry_stream(
client,
electrical_component_id,
tx,
stream_status_tx,
)
.await;
response_tx.send(rx).map_err(|_| {
tracing::error!("failed to send response");
Error::internal("failed to send response")
})?;
}
Some(Instruction::ListElectricalComponents {
response_tx,
electrical_component_ids,
electrical_component_categories,
}) => {
let components = client
.list_electrical_components(ListElectricalComponentsRequest {
electrical_component_ids,
electrical_component_categories: electrical_component_categories
.into_iter()
.map(|c| c as i32)
.collect(),
})
.await
.map_err(|e| Error::connection_failure(format!("list_components failed: {e}")))
.map(|r| r.into_inner().electrical_components);
response_tx
.send(components)
.map_err(|_| Error::internal("failed to send response"))?;
}
Some(Instruction::ListElectricalComponentConnections {
response_tx,
source_electrical_component_ids,
destination_electrical_component_ids,
}) => {
let connections = client
.list_electrical_component_connections(ListElectricalComponentConnectionsRequest {
source_electrical_component_ids,
destination_electrical_component_ids,
})
.await
.map_err(|e| Error::connection_failure(format!("list_connections failed: {e}")))
.map(|r| r.into_inner().electrical_component_connections);
response_tx
.send(connections)
.map_err(|_| Error::internal("failed to send response"))?;
}
Some(Instruction::AugmentElectricalComponentBounds {
electrical_component_id,
target_metric,
bounds,
request_lifetime,
response_tx,
}) => {
let response = client
.augment_electrical_component_bounds(
crate::proto::microgrid::AugmentElectricalComponentBoundsRequest {
electrical_component_id,
target_metric: target_metric as i32,
bounds,
request_lifetime: request_lifetime.and_then(|d| {
let secs = d.num_seconds();
u64::try_from(secs).ok()
}),
},
)
.await
.map_err(|e| {
Error::api_server_error(format!(
"augment_electrical_component_bounds failed: {e}"
))
})
.map(|r| {
r.into_inner().valid_until_time.and_then(|t| {
match DateTime::from_timestamp(t.seconds, t.nanos as u32) {
dt @ Some(_) => dt,
None => {
tracing::error!(
concat!(
"Received invalid valid_until_time in ",
"AugmentElectricalComponentBoundsResponse: {:?}"
),
t
);
None
}
}
})
});
response_tx
.send(response)
.map_err(|_| Error::internal("failed to send response"))?;
}
None => {}
}
Ok(())
}
async fn handle_retry_timer<T: MicrogridApiClient>(
client: &mut T,
component_streams: &mut HashMap<u64, broadcast::Sender<ElectricalComponentTelemetry>>,
components_to_retry: &mut HashMap<u64, RetryTracker>,
stream_status_tx: mpsc::Sender<StreamStatus>,
now: tokio::time::Instant,
) -> Result<(), Error> {
for item in components_to_retry.iter_mut() {
if let Some(retry_time) = item.1.next_retry_time() {
if retry_time > now {
continue;
}
item.1.mark_new_retry();
let (component_id, _) = item;
if let Some(tx) = component_streams.get(component_id).cloned() {
start_electrical_component_telemetry_stream(
client,
*component_id,
tx,
stream_status_tx.clone(),
)
.await;
} else {
tracing::error!("Component stream not found for retry: {component_id}");
return Err(Error::internal(format!(
"Component stream not found for retry: {component_id}"
)));
}
}
}
Ok(())
}
async fn start_electrical_component_telemetry_stream<T: MicrogridApiClient>(
client: &mut T,
electrical_component_id: u64,
tx: broadcast::Sender<ElectricalComponentTelemetry>,
stream_status_tx: mpsc::Sender<StreamStatus>,
) {
let stream = match client
.receive_electrical_component_telemetry_stream(
ReceiveElectricalComponentTelemetryStreamRequest {
electrical_component_id,
filter: None,
},
)
.await
{
Ok(s) => s.into_inner(),
Err(e) => {
let _ = stream_status_tx
.send(StreamStatus::Failed(electrical_component_id))
.await;
tracing::debug!("Failed to start telemetry stream for {electrical_component_id}: {e}",);
return;
}
};
if let Err(e) = stream_status_tx
.send(StreamStatus::Connected(electrical_component_id))
.await
{
tracing::error!(
"Failed to send stream connected message for {electrical_component_id}: {e}",
);
return;
}
tokio::spawn(
run_electrical_component_telemetry_stream(
stream,
electrical_component_id,
tx,
stream_status_tx,
)
.in_current_span(),
);
}
async fn run_electrical_component_telemetry_stream(
mut stream: impl Stream<
Item = Result<ReceiveElectricalComponentTelemetryStreamResponse, tonic::Status>,
> + Unpin,
electrical_component_id: u64,
tx: broadcast::Sender<ElectricalComponentTelemetry>,
stream_status_tx: mpsc::Sender<StreamStatus>,
) {
loop {
if tx.receiver_count() == 0 {
tracing::debug!(
"Dropping ComponentData stream for component_id:{:?}",
electrical_component_id
);
stream_status_tx
.send(StreamStatus::Ended(electrical_component_id))
.await
.unwrap_or_else(|e| {
tracing::error!(
"Failed to send stream ended message for {:?}: {:?}",
electrical_component_id,
e
);
});
return;
}
let message = match stream.next().await {
Some(m) => m,
None => {
tracing::error!(
"get_component_data stream failed for {}",
electrical_component_id,
);
break;
}
};
let data = match message {
Ok(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: Some(d) }) => d,
Ok(ReceiveElectricalComponentTelemetryStreamResponse { telemetry: None }) => {
tracing::warn!(
"get_component_data stream returned empty data for {}",
electrical_component_id
);
continue;
}
Err(e) => {
tracing::warn!(
"get_component_data stream ended for {}: {:?}",
electrical_component_id,
e
);
break;
}
};
if tx.send(data).is_err() {
continue;
};
}
if let Err(e) = stream_status_tx
.send(StreamStatus::Failed(electrical_component_id))
.await
{
tracing::error!(
"Failed to send stream stopped message for {:?}: {:?}",
electrical_component_id,
e
);
}
}