1use futures_util::{Stream, StreamExt};
2use serde::{Deserialize, Serialize};
5use serde_json;
6use std::{
7 cell::RefCell,
8 path::Path,
9 rc::Rc,
10 sync::{Arc, Mutex},
11};
12use tokio::sync::{broadcast::Receiver, mpsc};
13use tokio_stream::wrappers::UnboundedReceiverStream;
14use warp::sse::Event;
15
16#[derive(Serialize, Deserialize, Default)]
19struct Stack<'a> {
20 name: &'a str,
21 value: usize,
22 children: Vec<Rc<RefCell<Stack<'a>>>>,
23}
24
25impl<'a> Stack<'a> {
26 fn new(name: &'a str) -> Rc<RefCell<Self>> {
27 Rc::new(RefCell::new(Self {
28 name,
29 ..Default::default()
30 }))
31 }
32}
33
34pub async fn start_server(mut rx: Receiver<String>) {
36 use warp::Filter;
37
38 let latest_data = Arc::new(Mutex::new("{}".to_string()));
39 let subscribers = Arc::new(Mutex::new(Vec::<mpsc::UnboundedSender<String>>::new()));
40 let access_subscribers = subscribers.clone();
41 let subscriptions = warp::any().map(move || subscribers.clone());
42
43 let writer = latest_data.clone();
44 tokio::spawn(async move {
45 while let Ok(data) = rx.recv().await {
46 println!("Received....");
47 let mut write = writer.lock().expect("poisoned");
48 *write = data.clone();
49 drop(write);
50
51 let mut write = access_subscribers.lock().expect("poisoned");
52 write.retain(|tx| tx.send(data.clone()).is_ok());
53 println!("{} realtime subscribers", write.len());
54 }
55 });
56
57 let json_copy = latest_data.clone();
58 let json = warp::path("json").and(warp::get()).map(move || {
60 let json = json_copy.lock().expect("poisoned").clone();
61
62 warp::http::Response::builder()
63 .header("content-type", "application/json")
64 .body(json)
65 });
66
67 let stream = warp::path("stream")
69 .and(warp::get())
70 .and(subscriptions)
71 .map(|subscriptions| {
72 let stream = connected(subscriptions);
73
74 warp::sse::reply(warp::sse::keep_alive().stream(stream))
76 });
77
78 let index = warp::path::end().map(move || {
80 let json_copy = latest_data.lock().expect("poisoned");
81
82 warp::http::Response::builder()
83 .header("content-type", "text/html; charset=utf-8")
84 .body(flamegraph_html(&json_copy))
85 });
86
87 eprintln!("Listening on port 8000. Goto http://localhost:8000/");
88 warp::serve(index.or(json).or(stream))
89 .run(([127, 0, 0, 1], 8000))
90 .await;
91}
92
93fn connected(
94 subscriptions: Arc<Mutex<Vec<mpsc::UnboundedSender<String>>>>,
95) -> impl Stream<Item = Result<Event, warp::Error>> + Send + 'static {
96 println!("new subscription");
97
98 let (tx, rx) = mpsc::unbounded_channel();
101 let rx = UnboundedReceiverStream::new(rx);
102
103 subscriptions.lock().unwrap().push(tx);
105
106 rx.map(|msg| Ok(Event::default().data(msg)))
107}
108
109pub fn collapse_to_json(stacks: &[&str]) -> String {
111 let root = Stack::new("");
112 let mut crumbs = vec![root.clone()];
113
114 for stack in stacks {
115 let mut parts = stack.split(' ');
116 let names = parts.next().map(|v| v.split(';')).expect("stack");
117 let count = parts
118 .next()
119 .and_then(|v| v.parse::<usize>().ok())
120 .unwrap_or(1);
121
122 let mut depth = 0;
123
124 for name in names {
125 depth += 1;
126
127 if depth >= crumbs.len() || name != crumbs[depth].borrow().name {
128 crumbs.truncate(depth);
130
131 let node = Stack::new(name);
132
133 crumbs[depth - 1].borrow_mut().children.push(node.clone());
134 crumbs.push(node);
135 }
136 }
137
138 if depth + 1 != crumbs.len() {
139 crumbs.truncate(depth);
140 }
141
142 let self_value = false;
143 if self_value {
144 crumbs
147 .last()
148 .unwrap_or_else(|| unreachable!("always have one"))
149 .borrow_mut()
150 .value += count;
151 } else {
152 for node in crumbs.iter() {
154 node.borrow_mut().value += count;
155 }
156 }
157 }
158
159 serde_json::to_string(&root).expect("serialization to json")
160}
161
162pub fn generate_html_file(filename: &Path, data: &str) {
163 let html = flamegraph_html(data);
164 std::fs::write(filename, html).expect("Unable to write stack html file");
165}
166
167const HTML_TEMPLATE: &str = include_str!("../assets/d3-flamegraph.html");
169const SCRIPTS: &str = include_str!("../assets/scripts.js");
170const STYLES: &str = include_str!("../assets/styles.css");
171
172const EXTERNAL_SCRIPTS: &str = r#"<script src="https://d3js.org/d3.v7.js"></script>
173<script src="https://cdnjs.cloudflare.com/ajax/libs/d3-tip/0.9.1/d3-tip.min.js"></script>
174<script src="https://cdn.jsdelivr.net/npm/d3-flame-graph@4.1.3/dist/d3-flamegraph.min.js"></script>"#;
175
176const EXTERNAL_STYLES: &str = r#"
177<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" />
178<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/d3-flame-graph@4.1.3/dist/d3-flamegraph.css" />"#;
179
180fn flamegraph_html(stacks: &str) -> String {
181 let embedded = true;
182
183 let template = HTML_TEMPLATE
184 .replace("{stack}", stacks)
185 .replace("{title}", "profile-bee");
186
187 if embedded {
188 template
189 .replace("{scripts}", &format!("<script>{}</script>", SCRIPTS))
190 .replace("{styles}", &format!("<style>{}</style>", STYLES))
191 } else {
192 template
193 .replace("{scripts}", EXTERNAL_SCRIPTS)
194 .replace("{styles}", EXTERNAL_STYLES)
195 }
196}
197
198#[test]
199fn test_serialization() {
200 let x = [
201 "a 1",
202 "a;b 1",
203 "a;b 1",
204 "a;b;c 1",
205 "a;b;c;d 1",
206 "a;b;e 3",
207 "f;g 1",
208 ];
209
210 assert_eq!(
211 collapse_to_json(&x),
212 r##"{"name":"","value":9,"children":[{"name":"a","value":8,"children":[{"name":"b","value":7,"children":[{"name":"c","value":2,"children":[{"name":"d","value":1,"children":[]}]},{"name":"e","value":3,"children":[]}]}]},{"name":"f","value":1,"children":[{"name":"g","value":1,"children":[]}]}]}"##
213 );
214
215 let mut test = Stack::default();
216 test.name = "hi";
217 test.value = 10;
218 let mut test1 = Stack::default();
219 test1.name = "test 1";
220 test1.value = 3;
221 let mut test2 = Stack::default();
222 test2.name = "test 2";
223 test2.value = 4;
224 test.children.push(Rc::new(RefCell::new(test1)));
225 test.children.push(Rc::new(RefCell::new(test2)));
226
227 let test_json = serde_json::to_string(&test).expect("serialization to json");
228
229 assert_eq!(
230 test_json,
231 r##"{"name":"hi","value":10,"children":[{"name":"test 1","value":3,"children":[]},{"name":"test 2","value":4,"children":[]}]}"##
232 );
233}