use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use coapum::{
dtls::{
Error,
cipher_suite::CipherSuiteId,
config::{Config, ExtendedMasterSecretType},
},
extract::{Cbor, Identity, Path, State, StatusCode},
observer::memory::MemObserver,
router::RouterBuilder,
serve,
};
use serde::{Deserialize, Serialize};
type PskStore = Arc<RwLock<HashMap<String, Vec<u8>>>>;
const PSK: &[u8] = "63ef2024b1de6417f856fab7005d38f6".as_bytes();
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeviceState {
temperature: f32,
humidity: f32,
battery_level: u8,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct ApiResponse {
status: String,
message: String,
}
#[derive(Clone, Debug)]
struct AppState {
device_states: Arc<tokio::sync::Mutex<HashMap<String, DeviceState>>>,
}
impl AsRef<AppState> for AppState {
fn as_ref(&self) -> &AppState {
self
}
}
async fn update_device_state(
Path(device_id): Path<String>,
Cbor(new_state): Cbor<DeviceState>,
Identity(client_id): Identity,
State(app_state): State<AppState>,
) -> Result<Cbor<ApiResponse>, StatusCode> {
log::info!(
"Updating device {} state from client {}: temp={}°C, humidity={}%, battery={}%",
device_id,
client_id,
new_state.temperature,
new_state.humidity,
new_state.battery_level
);
let mut states = app_state.device_states.lock().await;
states.insert(device_id.clone(), new_state.clone());
let response = ApiResponse {
status: "success".to_string(),
message: format!("Device {} state updated", device_id),
};
Ok(Cbor(response))
}
async fn get_device_state(
Path(device_id): Path<String>,
Identity(client_id): Identity,
State(app_state): State<AppState>,
) -> Result<Cbor<DeviceState>, StatusCode> {
log::info!(
"Getting device {} state for client {}",
device_id,
client_id
);
let states = app_state.device_states.lock().await;
let state = states.get(&device_id).cloned().unwrap_or(DeviceState {
temperature: 23.5,
humidity: 45.2,
battery_level: 85,
});
Ok(Cbor(state))
}
async fn notify_device_state(
Path(device_id): Path<String>,
Identity(client_id): Identity,
State(app_state): State<AppState>,
) -> Cbor<DeviceState> {
log::info!(
"Sending notification for device {} to client {}",
device_id,
client_id
);
let states = app_state.device_states.lock().await;
let state = states.get(&device_id).cloned().unwrap_or(DeviceState {
temperature: 24.1,
humidity: 43.8,
battery_level: 84,
});
Cbor(state)
}
async fn delete_device_state(
Path(device_id): Path<String>,
Identity(client_id): Identity,
State(app_state): State<AppState>,
) -> Result<StatusCode, StatusCode> {
log::info!(
"Deleting device {} state for client {}",
device_id,
client_id
);
let mut states = app_state.device_states.lock().await;
states.remove(&device_id);
Ok(StatusCode::Deleted)
}
async fn handle_stream_data(
Path(stream_id): Path<String>,
Identity(client_id): Identity,
) -> StatusCode {
log::info!(
"Received stream data for {} from client {}",
stream_id,
client_id
);
StatusCode::Valid
}
async fn echo_handler(payload: coapum::extract::Bytes) -> coapum::extract::Bytes {
log::info!("Echoing {} bytes", payload.len());
payload
}
async fn ping_handler(Identity(client_id): Identity) -> StatusCode {
log::info!("Ping from {}", client_id);
StatusCode::Valid
}
#[tokio::main]
async fn main() {
env_logger::init();
log::info!("Starting ergonomic CoAP server!");
let psk_store: PskStore = Arc::new(RwLock::new(HashMap::new()));
{
let mut psk_store_write = psk_store.write().unwrap();
psk_store_write.insert("goobie!".to_string(), PSK.to_vec());
}
let app_state = AppState {
device_states: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
};
let observer = MemObserver::new();
let router = RouterBuilder::new(app_state, observer)
.post(".d/:device_id", update_device_state)
.get(".d/:device_id", get_device_state)
.observe(".d/:device_id", get_device_state, notify_device_state)
.delete(".d/:device_id", delete_device_state)
.post(".s/:stream_id", handle_stream_data)
.put("echo", echo_handler)
.get("hello", echo_handler)
.get("", ping_handler)
.build();
let addr = "127.0.0.1:5684";
let dtls_cfg = Config {
psk: Some(Arc::new(move |hint: &[u8]| -> Result<Vec<u8>, Error> {
let hint = String::from_utf8(hint.to_vec()).unwrap();
log::info!("Client's hint: {}", hint);
if let Some(psk) = psk_store.read().unwrap().get(&hint) {
Ok(psk.clone())
} else {
log::info!("Hint {} not found in store", hint);
Err(Error::ErrIdentityNoPsk)
}
})),
psk_identity_hint: Some("coapum ergonomic server".as_bytes().to_vec()),
cipher_suites: vec![CipherSuiteId::Tls_Psk_With_Aes_128_Gcm_Sha256],
extended_master_secret: ExtendedMasterSecretType::Require,
..Default::default()
};
let cfg = coapum::config::Config {
dtls_cfg,
..Default::default()
};
log::info!("Server listening on {}", addr);
log::info!("Routes configured:");
log::info!(" POST .d/:device_id - Update device state");
log::info!(" GET .d/:device_id - Get device state");
log::info!(" DELETE .d/:device_id - Delete device state");
log::info!(" POST .s/:stream_id - Handle stream data");
log::info!(" PUT echo - Echo payload");
log::info!(" GET hello - Echo payload");
log::info!(" ANY / - Ping");
if let Err(e) = serve::serve(addr.to_string(), cfg, router).await {
log::error!("Server error: {}", e);
}
}