use crate as rt;
use rt::profiled_actor::{self, MetricEntry};
use rt::{
define_sim_sync_accessor, define_sync_accessor,
rtactor_macros::{ResponseEnum, SyncNotifier, SyncRequester},
};
use std::{
cell::Cell,
collections::{BTreeMap, BTreeSet},
};
#[derive(ResponseEnum, SyncRequester)]
pub enum Request {
RegisterObserver {
addr: rt::Addr,
},
UnregisterObserver {
addr: rt::Addr,
},
RegisterProfiledActors {
names_addresses: Vec<(String, rt::Addr)>,
},
UnregisterProfiledActors {
selection: ActorSelection,
},
}
define_sim_sync_accessor!(SimSyncAccessor, SyncRequester, SyncNotifier);
define_sync_accessor!(SyncAccessor, SyncRequester, SyncNotifier);
#[derive(SyncNotifier)]
pub enum Notification {
SnapshotMetrics { selection: ActorSelection },
}
pub enum ActorSelection {
All,
SelectedNames(Vec<String>),
SelectedAddresses(Vec<rt::Addr>),
}
pub mod observer {
use rtactor::{define_sim_sync_accessor, define_sync_accessor, rtactor_macros::SyncNotifier};
use crate::profiled_actor::MetricEntry;
use rtactor as rt;
#[derive(SyncNotifier)]
pub enum Notification {
MetricUpdate {
timestamp: rt::Instant,
names_metric_lists: Vec<(String, Vec<MetricEntry>)>,
},
}
define_sim_sync_accessor!(SimSyncAccessor, SyncNotifier);
define_sync_accessor!(SyncAccessor, SyncNotifier);
}
pub struct ProfilingAggregator {
actor_entry_dict: BTreeMap<String, ActorEntry>,
observer_set: BTreeSet<rt::Addr>,
state: State,
}
impl ProfilingAggregator {
pub fn new() -> Self {
Self {
actor_entry_dict: BTreeMap::new(),
observer_set: BTreeSet::new(),
state: State::Idle,
}
}
}
impl Default for ProfilingAggregator {
fn default() -> Self {
Self::new()
}
}
impl rt::Behavior for ProfilingAggregator {
fn process_message(&mut self, context: &mut rt::ProcessContext, msg: &rt::Message) {
match msg {
rt::Message::Request(request) => self.process_request(context, request),
rt::Message::Response(response) => self.process_response(context, response),
rt::Message::Notification(notification) => {
self.process_notification(context, notification)
}
}
}
}
const INITIAL_METRIC_LIST_CAPACITY: usize = 20;
enum State {
Idle,
Processing {
snapshot_list: Vec<String>,
timestamp: rt::Instant, requested_index: usize,
request_id: rt::RequestId,
},
}
struct ActorEntry {
addr: rt::Addr,
metric_list: Cell<Option<Vec<MetricEntry>>>,
}
impl ActorEntry {
pub fn new(addr: &rt::Addr) -> Self {
Self {
addr: addr.clone(),
metric_list: Cell::new(Some(Vec::with_capacity(INITIAL_METRIC_LIST_CAPACITY))),
}
}
}
impl ProfilingAggregator {
fn convert_selection_to_names(&self, selection: &ActorSelection) -> Vec<String> {
match selection {
ActorSelection::All => self.actor_entry_dict.keys().cloned().collect(),
ActorSelection::SelectedNames(names) => names.clone(),
ActorSelection::SelectedAddresses(addresses) => {
let mut names = Vec::with_capacity(addresses.len());
for it in self.actor_entry_dict.iter() {
if addresses.contains(&it.1.addr) {
names.push(it.0.clone());
}
}
names
}
}
}
fn process_request(&mut self, context: &mut rt::ProcessContext, request: &rt::Request) {
if let Some(data) = request.data.downcast_ref() {
match data {
Request::RegisterObserver { addr } => {
self.observer_set.insert(addr.clone());
context.send_response(request, Response::RegisterObserver());
}
Request::UnregisterObserver { addr } => {
self.observer_set.remove(addr);
context.send_response(request, Response::UnregisterObserver());
}
Request::RegisterProfiledActors { names_addresses } => {
for (name, addr) in names_addresses {
self.actor_entry_dict
.insert(name.clone(), ActorEntry::new(addr));
}
context.send_response(request, Response::RegisterProfiledActors());
}
Request::UnregisterProfiledActors { selection } => {
match selection {
ActorSelection::All => self.actor_entry_dict.clear(),
ActorSelection::SelectedNames(names) => {
for name in names {
self.actor_entry_dict.remove(name);
}
}
ActorSelection::SelectedAddresses(addresses) => {
self.actor_entry_dict
.retain(|_, entry| !addresses.contains(&entry.addr));
}
};
context.send_response(request, Response::UnregisterProfiledActors());
}
}
}
}
fn process_next_snapshot_and_notify(&mut self, context: &mut rt::ProcessContext) {
let mut list_and_timestamp_to_notify = None;
match &mut self.state {
State::Idle => {}
State::Processing {
snapshot_list,
timestamp,
requested_index,
request_id,
} => {
let mut request_sent = false;
for name in snapshot_list.iter().skip(*requested_index) {
if let Some(entry) = self.actor_entry_dict.get(name) {
let list = match entry.metric_list.take() {
Some(mut list) => {
list.clear();
list
}
None => Vec::with_capacity(INITIAL_METRIC_LIST_CAPACITY),
};
*request_id = context.send_request(
&entry.addr,
profiled_actor::Request::GetMetrics {
metric_list: Cell::new(Some(list)),
},
);
request_sent = true;
break;
} else {
*requested_index += 1;
}
}
if !request_sent {
list_and_timestamp_to_notify = Some((snapshot_list.clone(), *timestamp));
}
}
}
if let Some((list_to_notify, timestamp)) = list_and_timestamp_to_notify {
self.notify_observers(context, &list_to_notify, ×tamp);
self.state = State::Idle;
}
}
fn process_response(&mut self, context: &mut rt::ProcessContext, response: &rt::Response) {
if let State::Processing {
snapshot_list,
requested_index,
request_id,
..
} = &mut self.state
{
match &response.result {
Ok(data) => {
if let Some(data) = data.downcast_ref() {
match data {
profiled_actor::Response::GetMetrics(entry_list) => {
if response.id_eq(*request_id) {
if let Some(entry) = self
.actor_entry_dict
.get_mut(&*snapshot_list[*requested_index])
{
entry.metric_list.swap(entry_list)
}
*requested_index += 1;
self.process_next_snapshot_and_notify(context);
}
}
}
}
}
Err(error_status) => {
if let Some(data) = error_status.request_data.downcast_ref() {
match data {
profiled_actor::Response::GetMetrics(_) => {
if response.id_eq(*request_id) {
self.process_next_snapshot_and_notify(context);
}
}
}
}
}
}
}
}
fn notify_observers(
&mut self,
context: &mut rt::ProcessContext,
names: &Vec<String>,
timestamp: &rt::Instant,
) {
let mut names_metric_lists = Vec::with_capacity(names.len());
for name in names {
if let Some(entry) = self.actor_entry_dict.get_mut(name) {
let list = match entry.metric_list.get_mut() {
Some(list) => list.clone(),
None => Vec::new(),
};
names_metric_lists.push((name.clone(), list));
}
}
for observer in &self.observer_set {
let _ = context.send_notification(
observer,
observer::Notification::MetricUpdate {
names_metric_lists: names_metric_lists.clone(),
timestamp: *timestamp,
},
);
}
}
fn process_notification(
&mut self,
context: &mut rt::ProcessContext,
notification: &rt::Notification,
) {
if let Some(data) = notification.data.downcast_ref() {
match data {
Notification::SnapshotMetrics { selection } => {
let new_snapshot_list = self.convert_selection_to_names(selection);
match &mut self.state {
State::Idle => {
self.state = State::Processing {
snapshot_list: new_snapshot_list,
timestamp: context.now(),
requested_index: 0,
request_id: 0,
};
self.process_next_snapshot_and_notify(context);
}
State::Processing { snapshot_list, .. } => {
for name in new_snapshot_list {
if !snapshot_list.contains(&name) {
snapshot_list.push(name);
}
}
}
}
}
}
}
}
}