1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Instant;
4
5use serde::Serialize;
6
7use rns_crypto::identity::Identity;
8use rns_net::{Destination, RnsNode};
9
10use crate::encode::to_hex;
11
12const MAX_RECORDS: usize = 1000;
13
14pub type SharedState = Arc<RwLock<CtlState>>;
16
17pub type WsBroadcast = Arc<Mutex<Vec<std::sync::mpsc::Sender<WsEvent>>>>;
19
20pub struct CtlState {
21 pub started_at: Instant,
22 pub identity_hash: Option<[u8; 16]>,
23 pub identity: Option<Identity>,
24 pub announces: VecDeque<AnnounceRecord>,
25 pub packets: VecDeque<PacketRecord>,
26 pub proofs: VecDeque<ProofRecord>,
27 pub link_events: VecDeque<LinkEventRecord>,
28 pub resource_events: VecDeque<ResourceEventRecord>,
29 pub destinations: HashMap<[u8; 16], DestinationEntry>,
30 pub node_handle: Option<Arc<Mutex<Option<RnsNode>>>>,
31}
32
33pub struct DestinationEntry {
35 pub destination: Destination,
36 pub full_name: String,
38}
39
40impl CtlState {
41 pub fn new() -> Self {
42 CtlState {
43 started_at: Instant::now(),
44 identity_hash: None,
45 identity: None,
46 announces: VecDeque::new(),
47 packets: VecDeque::new(),
48 proofs: VecDeque::new(),
49 link_events: VecDeque::new(),
50 resource_events: VecDeque::new(),
51 destinations: HashMap::new(),
52 node_handle: None,
53 }
54 }
55
56 pub fn uptime_seconds(&self) -> f64 {
57 self.started_at.elapsed().as_secs_f64()
58 }
59}
60
61fn push_capped<T>(deque: &mut VecDeque<T>, item: T) {
62 if deque.len() >= MAX_RECORDS {
63 deque.pop_front();
64 }
65 deque.push_back(item);
66}
67
68pub fn push_announce(state: &SharedState, record: AnnounceRecord) {
69 let mut s = state.write().unwrap();
70 push_capped(&mut s.announces, record);
71}
72
73pub fn push_packet(state: &SharedState, record: PacketRecord) {
74 let mut s = state.write().unwrap();
75 push_capped(&mut s.packets, record);
76}
77
78pub fn push_proof(state: &SharedState, record: ProofRecord) {
79 let mut s = state.write().unwrap();
80 push_capped(&mut s.proofs, record);
81}
82
83pub fn push_link_event(state: &SharedState, record: LinkEventRecord) {
84 let mut s = state.write().unwrap();
85 push_capped(&mut s.link_events, record);
86}
87
88pub fn push_resource_event(state: &SharedState, record: ResourceEventRecord) {
89 let mut s = state.write().unwrap();
90 push_capped(&mut s.resource_events, record);
91}
92
93pub fn broadcast(ws: &WsBroadcast, event: WsEvent) {
95 let mut senders = ws.lock().unwrap();
96 senders.retain(|tx| tx.send(event.clone()).is_ok());
97}
98
99#[derive(Debug, Clone, Serialize)]
102pub struct AnnounceRecord {
103 pub dest_hash: String,
104 pub identity_hash: String,
105 pub hops: u8,
106 pub app_data: Option<String>,
107 pub received_at: f64,
108}
109
110#[derive(Debug, Clone, Serialize)]
111pub struct PacketRecord {
112 pub dest_hash: String,
113 pub packet_hash: String,
114 pub data_base64: String,
115 pub received_at: f64,
116}
117
118#[derive(Debug, Clone, Serialize)]
119pub struct ProofRecord {
120 pub dest_hash: String,
121 pub packet_hash: String,
122 pub rtt: f64,
123}
124
125#[derive(Debug, Clone, Serialize)]
126pub struct LinkEventRecord {
127 pub link_id: String,
128 pub event_type: String,
129 pub is_initiator: Option<bool>,
130 pub rtt: Option<f64>,
131 pub identity_hash: Option<String>,
132 pub reason: Option<String>,
133}
134
135#[derive(Debug, Clone, Serialize)]
136pub struct ResourceEventRecord {
137 pub link_id: String,
138 pub event_type: String,
139 pub data_base64: Option<String>,
140 pub metadata_base64: Option<String>,
141 pub error: Option<String>,
142 pub received: Option<usize>,
143 pub total: Option<usize>,
144}
145
146#[derive(Debug, Clone)]
149pub struct WsEvent {
150 pub topic: &'static str,
151 pub payload: serde_json::Value,
152}
153
154impl WsEvent {
155 pub fn announce(record: &AnnounceRecord) -> Self {
156 WsEvent {
157 topic: "announces",
158 payload: serde_json::to_value(record).unwrap_or_default(),
159 }
160 }
161
162 pub fn packet(record: &PacketRecord) -> Self {
163 WsEvent {
164 topic: "packets",
165 payload: serde_json::to_value(record).unwrap_or_default(),
166 }
167 }
168
169 pub fn proof(record: &ProofRecord) -> Self {
170 WsEvent {
171 topic: "proofs",
172 payload: serde_json::to_value(record).unwrap_or_default(),
173 }
174 }
175
176 pub fn link(record: &LinkEventRecord) -> Self {
177 WsEvent {
178 topic: "links",
179 payload: serde_json::to_value(record).unwrap_or_default(),
180 }
181 }
182
183 pub fn resource(record: &ResourceEventRecord) -> Self {
184 WsEvent {
185 topic: "resources",
186 payload: serde_json::to_value(record).unwrap_or_default(),
187 }
188 }
189
190 pub fn to_json(&self) -> String {
191 let obj = serde_json::json!({
192 "type": self.topic.trim_end_matches('s'),
193 "data": self.payload,
194 });
195 serde_json::to_string(&obj).unwrap_or_default()
196 }
197}
198
199pub fn make_announce_record(announced: &rns_net::AnnouncedIdentity) -> AnnounceRecord {
201 AnnounceRecord {
202 dest_hash: to_hex(&announced.dest_hash.0),
203 identity_hash: to_hex(&announced.identity_hash.0),
204 hops: announced.hops,
205 app_data: announced
206 .app_data
207 .as_ref()
208 .map(|d| crate::encode::to_base64(d)),
209 received_at: announced.received_at,
210 }
211}