use overclock::core::*;
struct Incrementer;
#[async_trait::async_trait]
impl<S> Actor<S> for Incrementer
where
S: SupHandle<Self>,
{
type Data = prometheus::IntGauge;
type Channel = IntervalChannel<10>;
async fn init(&mut self, rt: &mut Rt<Self, S>) -> ActorResult<Self::Data> {
log::info!(
"scope_id: {}, {} is {}",
rt.scope_id(),
rt.service().actor_type_name(),
rt.service().status(),
);
let gauge: prometheus::IntGauge =
prometheus::core::GenericGauge::new("magnitude", "Decrementer and Incrementer gauge resource").unwrap();
rt.register(gauge.clone()).ok();
rt.add_resource(gauge.clone()).await;
Ok(gauge)
}
async fn run(&mut self, rt: &mut Rt<Self, S>, counter: Self::Data) -> ActorResult<()> {
while let Some(_instant) = rt.inbox_mut().next().await {
counter.inc();
}
Ok(())
}
}
struct Decrementer;
#[async_trait::async_trait]
impl<S> Actor<S> for Decrementer
where
S: SupHandle<Self>,
{
type Data = prometheus::IntGauge;
type Channel = IntervalChannel<10>;
async fn init(&mut self, rt: &mut Rt<Self, S>) -> ActorResult<Self::Data> {
log::info!(
"scope_id: {}, {} is {}",
rt.scope_id(),
rt.service().actor_type_name(),
rt.service().status()
);
if let Some(resource_scope_id) = rt.highest_scope_id::<Self::Data>().await {
let counter = rt.link::<Self::Data>(resource_scope_id, true).await.map_err(|e| {
log::error!("{:?}", e);
ActorError::exit_msg(format!("{:?}", e))
})?;
Ok(counter)
} else {
Err(ActorError::exit_msg(
"Unable to find scope id for IntGauge data resource",
))
}
}
async fn run(&mut self, rt: &mut Rt<Self, S>, counter: Self::Data) -> ActorResult<()> {
while let Some(_instant) = rt.inbox_mut().next().await {
counter.dec();
}
Ok(())
}
}
struct Overclock;
enum OverclockEvent {
Shutdown,
Microservice(ScopeId, Service),
}
impl ShutdownEvent for OverclockEvent {
fn shutdown_event() -> Self {
Self::Shutdown
}
}
impl<T> ServiceEvent<T> for OverclockEvent {
fn report_event(scope_id: ScopeId, service: Service) -> Self {
Self::Microservice(scope_id, service)
}
fn eol_event(scope_id: ScopeId, service: Service, _actor: T, _r: ActorResult<()>) -> Self {
Self::Microservice(scope_id, service)
}
}
#[async_trait::async_trait]
impl<S> Actor<S> for Overclock
where
S: SupHandle<Self>,
{
type Data = ();
type Channel = UnboundedChannel<OverclockEvent>;
async fn init(&mut self, rt: &mut Rt<Self, S>) -> ActorResult<Self::Data> {
log::info!("Overclock: {}", rt.service().status());
let incrementer = Incrementer;
rt.start(Some("incrementer".into()), incrementer).await.map_err(|e| {
log::error!("{:?}", e);
ActorError::exit_msg(format!("{:?}", e))
})?;
let decrementer = Decrementer;
rt.start(Some("decrementer".into()), decrementer).await.map_err(|e| {
log::error!("{:?}", e);
ActorError::exit_msg(format!("{:?}", e))
})?;
Ok(())
}
async fn run(&mut self, rt: &mut Rt<Self, S>, _deps: Self::Data) -> ActorResult<()> {
log::info!("Overclock: {}", rt.service().status());
while let Some(event) = rt.inbox_mut().next().await {
match event {
OverclockEvent::Shutdown => {
rt.stop().await;
log::info!("overclock got shutdown signal");
if rt.microservices_stopped() {
rt.inbox_mut().close();
}
}
OverclockEvent::Microservice(scope_id, service) => {
log::info!(
"Microservice: {}, dir: {:?}, status: {}",
service.actor_type_name(),
service.directory(),
service.status()
);
rt.upsert_microservice(scope_id, service);
if rt.microservices_stopped() {
rt.inbox_mut().close();
}
}
}
}
Ok(())
}
}
#[tokio::main]
async fn main() {
#[cfg(not(feature = "console"))]
{
let env = env_logger::Env::new().filter_or("RUST_LOG", "info");
env_logger::Builder::from_env(env).init();
}
let overclock = Overclock;
let backserver_addr = "127.0.0.1:9000"
.parse::<std::net::SocketAddr>()
.expect("parsable socket addr");
let runtime = Runtime::new(Some("overclock".into()), overclock)
.await
.expect("Runtime to build")
.backserver(backserver_addr)
.await
.expect("Websocket server to run");
runtime.block_on().await.expect("Runtime to shutdown gracefully");
}