1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Instant;
4
5use serde::Serialize;
6
7use rns_net::Destination;
8use rns_crypto::identity::Identity;
9
10use crate::encode::to_hex;
11
12const MAX_RECORDS: usize = 1000;
13
14pub type SharedState = Arc<RwLock<CtlState>>;
16
17pub type WsBroadcast = Arc<Mutex<Vec<std::sync::mpsc::Sender<WsEvent>>>>;
19
20pub struct CtlState {
21 pub started_at: Instant,
22 pub identity_hash: Option<[u8; 16]>,
23 pub identity: Option<Identity>,
24 pub announces: VecDeque<AnnounceRecord>,
25 pub packets: VecDeque<PacketRecord>,
26 pub proofs: VecDeque<ProofRecord>,
27 pub link_events: VecDeque<LinkEventRecord>,
28 pub resource_events: VecDeque<ResourceEventRecord>,
29 pub destinations: HashMap<[u8; 16], DestinationEntry>,
30}
31
32pub struct DestinationEntry {
34 pub destination: Destination,
35 pub full_name: String,
37}
38
39impl CtlState {
40 pub fn new() -> Self {
41 CtlState {
42 started_at: Instant::now(),
43 identity_hash: None,
44 identity: None,
45 announces: VecDeque::new(),
46 packets: VecDeque::new(),
47 proofs: VecDeque::new(),
48 link_events: VecDeque::new(),
49 resource_events: VecDeque::new(),
50 destinations: HashMap::new(),
51 }
52 }
53
54 pub fn uptime_seconds(&self) -> f64 {
55 self.started_at.elapsed().as_secs_f64()
56 }
57}
58
59fn push_capped<T>(deque: &mut VecDeque<T>, item: T) {
60 if deque.len() >= MAX_RECORDS {
61 deque.pop_front();
62 }
63 deque.push_back(item);
64}
65
66pub fn push_announce(state: &SharedState, record: AnnounceRecord) {
67 let mut s = state.write().unwrap();
68 push_capped(&mut s.announces, record);
69}
70
71pub fn push_packet(state: &SharedState, record: PacketRecord) {
72 let mut s = state.write().unwrap();
73 push_capped(&mut s.packets, record);
74}
75
76pub fn push_proof(state: &SharedState, record: ProofRecord) {
77 let mut s = state.write().unwrap();
78 push_capped(&mut s.proofs, record);
79}
80
81pub fn push_link_event(state: &SharedState, record: LinkEventRecord) {
82 let mut s = state.write().unwrap();
83 push_capped(&mut s.link_events, record);
84}
85
86pub fn push_resource_event(state: &SharedState, record: ResourceEventRecord) {
87 let mut s = state.write().unwrap();
88 push_capped(&mut s.resource_events, record);
89}
90
91pub fn broadcast(ws: &WsBroadcast, event: WsEvent) {
93 let mut senders = ws.lock().unwrap();
94 senders.retain(|tx| tx.send(event.clone()).is_ok());
95}
96
97#[derive(Debug, Clone, Serialize)]
100pub struct AnnounceRecord {
101 pub dest_hash: String,
102 pub identity_hash: String,
103 pub hops: u8,
104 pub app_data: Option<String>,
105 pub received_at: f64,
106}
107
108#[derive(Debug, Clone, Serialize)]
109pub struct PacketRecord {
110 pub dest_hash: String,
111 pub packet_hash: String,
112 pub data_base64: String,
113 pub received_at: f64,
114}
115
116#[derive(Debug, Clone, Serialize)]
117pub struct ProofRecord {
118 pub dest_hash: String,
119 pub packet_hash: String,
120 pub rtt: f64,
121}
122
123#[derive(Debug, Clone, Serialize)]
124pub struct LinkEventRecord {
125 pub link_id: String,
126 pub event_type: String,
127 pub is_initiator: Option<bool>,
128 pub rtt: Option<f64>,
129 pub identity_hash: Option<String>,
130 pub reason: Option<String>,
131}
132
133#[derive(Debug, Clone, Serialize)]
134pub struct ResourceEventRecord {
135 pub link_id: String,
136 pub event_type: String,
137 pub data_base64: Option<String>,
138 pub metadata_base64: Option<String>,
139 pub error: Option<String>,
140 pub received: Option<usize>,
141 pub total: Option<usize>,
142}
143
144#[derive(Debug, Clone)]
147pub struct WsEvent {
148 pub topic: &'static str,
149 pub payload: serde_json::Value,
150}
151
152impl WsEvent {
153 pub fn announce(record: &AnnounceRecord) -> Self {
154 WsEvent {
155 topic: "announces",
156 payload: serde_json::to_value(record).unwrap_or_default(),
157 }
158 }
159
160 pub fn packet(record: &PacketRecord) -> Self {
161 WsEvent {
162 topic: "packets",
163 payload: serde_json::to_value(record).unwrap_or_default(),
164 }
165 }
166
167 pub fn proof(record: &ProofRecord) -> Self {
168 WsEvent {
169 topic: "proofs",
170 payload: serde_json::to_value(record).unwrap_or_default(),
171 }
172 }
173
174 pub fn link(record: &LinkEventRecord) -> Self {
175 WsEvent {
176 topic: "links",
177 payload: serde_json::to_value(record).unwrap_or_default(),
178 }
179 }
180
181 pub fn resource(record: &ResourceEventRecord) -> Self {
182 WsEvent {
183 topic: "resources",
184 payload: serde_json::to_value(record).unwrap_or_default(),
185 }
186 }
187
188 pub fn to_json(&self) -> String {
189 let obj = serde_json::json!({
190 "type": self.topic.trim_end_matches('s'),
191 "data": self.payload,
192 });
193 serde_json::to_string(&obj).unwrap_or_default()
194 }
195}
196
197pub fn make_announce_record(announced: &rns_net::AnnouncedIdentity) -> AnnounceRecord {
199 AnnounceRecord {
200 dest_hash: to_hex(&announced.dest_hash.0),
201 identity_hash: to_hex(&announced.identity_hash.0),
202 hops: announced.hops,
203 app_data: announced.app_data.as_ref().map(|d| crate::encode::to_base64(d)),
204 received_at: announced.received_at,
205 }
206}