use std::{sync::Arc, time::Duration};
use anyhow::{Context, Result};
use clap::Parser;
use sep2_client::{
client::{Client, SEPResponse},
device::SEDevice,
event::{EIStatus, EventCallback, EventInstance, Schedule, Scheduler},
pubsub::ClientNotifServer,
};
use sep2_common::packages::{
dcap::DeviceCapability,
der::{DERControl, DERControlList, DERProgramList, DefaultDERControl},
edev::EndDevice,
fsa::FunctionSetAssignmentsList,
identification::ResponseStatus,
metering::Reading,
primitives::Uint32,
pubsub::Notification,
types::DeviceCategoryType,
};
use simple_logger::SimpleLogger;
use tokio::sync::{
mpsc::{self, Receiver},
RwLock,
};
use typemap_rev::{TypeMap, TypeMapKey};
struct ReadingResource;
impl TypeMapKey for ReadingResource {
type Value = Reading;
}
#[derive(Default, Clone)]
struct Handler {}
impl EventCallback<DERControl> for Handler {
async fn event_update(&self, event: &EventInstance<DERControl>) -> ResponseStatus {
match event.status() {
EIStatus::Scheduled => {
println!("Received DERControl: {:?}", event.event());
}
EIStatus::Active => {
println!("DERControl Started: {:?}", event.event());
}
EIStatus::Cancelled => {
println!("DERControl Cancelled: {:?}", event.event());
}
EIStatus::Complete => {
println!("DERControl Complete: {:?}", event.event());
}
EIStatus::CancelledRandom => {
println!("DERControl Cancelled: {:?}", event.event());
}
EIStatus::Superseded => {
println!("DERControl Started: {:?}", event.event());
}
};
event.status().into()
}
}
async fn poll_derprograms(client: &Client, path: &str) -> Result<Receiver<DERProgramList>> {
let dcap = client.get::<DERProgramList>(path).await?;
let (tx, rx) = mpsc::channel::<DERProgramList>(100);
client
.start_poll(
dcap.href.unwrap(),
Some(Uint32(1)),
move |dcap: DERProgramList| {
let tx = tx.clone();
async move { tx.send(dcap).await.unwrap() }
},
)
.await;
Ok(rx)
}
async fn process_derpl_task(
client: &Client,
mut schedule: Schedule<DERControl>,
derpl: DERProgramList,
) -> Result<()> {
for derp in derpl.der_program {
match (&derp.der_control_list_link, &derp.default_der_control_link) {
(Some(dercll), _) => {
let dercl: DERControlList = client.get(&dercll.href).await?;
for der in dercl.der_control {
schedule.add_event(der, &derp, 0).await;
}
}
(_, Some(ddercl)) => {
let _: DefaultDERControl = client.get(&ddercl.href).await?;
}
_ => log::warn!("Found a DERProgram with no DERControls or default"),
}
}
Ok(())
}
async fn setup_schedule(
client: &Client,
edr: Arc<RwLock<SEDevice>>,
schedule: Schedule<DERControl>,
) -> Result<()> {
let res = client.post("/edev", &edr.read().await.edev).await.unwrap();
if let SEPResponse::Created(loc) = res {
let loc = loc.context("No location header provided.")?;
let edr: EndDevice = client
.get(&loc)
.await
.context("Failed to retrieve EndDevice resource")?;
let fsal = edr.function_set_assignments_list_link.unwrap();
let fsal: FunctionSetAssignmentsList = client
.get(&format!("{}?l={}", fsal.href, fsal.all.unwrap()))
.await
.context("Failed to retrieve FunctionSetAssignmentsList resource")?;
let fsa = fsal
.function_set_assignments
.iter()
.find(|e| e.der_program_list_link.is_some())
.context("FSA List did not contain a DER Program List Link")?;
let derpll = fsa.der_program_list_link.as_ref().unwrap();
let mut rx = poll_derprograms(
client,
&format!("{}?l={}", derpll.href, derpll.all.unwrap()),
)
.await
.context("Failed to retrieve an initial instance of a DERProgramList")?;
tokio::spawn({
let schedule = schedule.clone();
let client = client.clone();
async move {
while let Some(derpl) = rx.recv().await {
let _ = process_derpl_task(&client, schedule.clone(), derpl)
.await
.map_err(|e| log::warn!("Failed to process DERPL with reason: {e}"));
}
}
});
}
Ok(())
}
async fn incoming_dcap(notif: Notification<DeviceCapability>) -> SEPResponse {
println!("Notif Received: {:?}", notif);
SEPResponse::Created(None)
}
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
target_addr: String,
notif_addr: String,
notif_port: u16,
cert: String,
key: String,
ca: String,
}
#[tokio::main]
async fn main() -> Result<()> {
SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
.init()
.unwrap();
let args = Args::parse();
let state: Arc<RwLock<TypeMap>> = Arc::new(RwLock::new(TypeMap::new()));
let edr = SEDevice::new_from_cert(&args.cert, DeviceCategoryType::all()).unwrap();
let edr = Arc::new(RwLock::new(edr));
let notifs = ClientNotifServer::new(&format!("{}:{}", &args.notif_addr, &args.notif_port))?
.with_https(&args.cert, &args.key, &args.ca)?
.add("/reading", {
let notif_state = state.clone();
move |notif: Notification<Reading>| {
let notif_state = notif_state.clone();
async move {
match notif.resource {
Some(r) => {
notif_state.write().await.insert::<ReadingResource>(r);
SEPResponse::Created(None)
}
None => SEPResponse::BadRequest(None),
}
}
}
})
.add("/dcap", incoming_dcap);
let notif_handle = tokio::spawn(notifs.run(tokio::signal::ctrl_c()));
let client = Client::new_https(
&args.target_addr,
&args.cert,
&args.key,
&args.ca,
None,
None,
)?;
let handler = Handler::default();
let schedule: Schedule<DERControl> = Scheduler::new(
client.clone(),
edr.clone(),
handler,
Duration::from_secs(60 * 10),
);
let _ = setup_schedule(&client, edr, schedule)
.await
.map_err(|e| log::warn!("Failed to setup schedule with reason {}", e));
notif_handle.await?
}