1use async_trait::async_trait;
16use pingora::server::ShutdownWatch;
17use pingora::services::background::BackgroundService;
18use pyroscope::{
19 pyroscope::PyroscopeAgentRunning, PyroscopeAgent, PyroscopeError,
20};
21use pyroscope_pprofrs::{pprof_backend, PprofConfig};
22use snafu::{ResultExt, Snafu};
23use substring::Substring;
24use tracing::{error, info};
25use url::Url;
26
27#[derive(Debug, Snafu)]
28pub enum Error {
29 #[snafu(display("Url parse error {source}, {url}"))]
30 UrlParse {
31 source: url::ParseError,
32 url: String,
33 },
34 #[snafu(display("Pyroscope error {source}"))]
35 Pyroscope { source: PyroscopeError },
36}
37type Result<T, E = Error> = std::result::Result<T, E>;
38
39pub struct AgentService {
40 url: String,
41}
42
43pub fn new_agent_service(value: &str) -> AgentService {
44 AgentService {
45 url: value.to_string(),
46 }
47}
48
49#[async_trait]
50impl BackgroundService for AgentService {
51 async fn start(&self, mut shutdown: ShutdownWatch) {
52 match start_pyroscope(&self.url) {
53 Ok(agent_running) => {
54 let _ = shutdown.changed().await;
55 let agent_ready = agent_running.stop().unwrap();
56 agent_ready.shutdown();
57 },
58 Err(e) => {
59 error!("start pyroscope error: {}", e);
60 },
61 }
62 }
63}
64
65fn start_pyroscope(
66 value: &str,
67) -> Result<PyroscopeAgent<PyroscopeAgentRunning>> {
68 let mut connect_url = value.to_string();
69 let url_info = Url::parse(value).context(UrlParseSnafu {
70 url: value.to_string(),
71 })?;
72 let mut application_name = "pingap".to_string();
73 let mut user = "".to_string();
74 let mut password = "".to_string();
75 let mut sample_rate = 100;
76 let mut tags = vec![];
77 let format_tag_value = |value: &str| {
78 if value.starts_with("$") {
79 std::env::var(value.substring(1, value.len()))
80 .unwrap_or(value.to_string())
81 } else {
82 value.to_string()
83 }
84 };
85 let tag_key_prefix = "tag:";
86 for (key, value) in url_info.query_pairs().into_iter() {
87 match key.as_ref() {
88 "app" => application_name = value.to_string(),
89 "user" => user = value.to_string(),
90 "password" => password = value.to_string(),
91 "sample_rate" => {
92 if let Ok(v) = value.parse::<u32>() {
93 sample_rate = v;
94 }
95 },
96 _ if key.starts_with(tag_key_prefix) => {
97 let tag_value = format_tag_value(&value);
98 let key =
99 key.substring(tag_key_prefix.len(), key.len()).to_string();
100 tags.push((key.to_string(), tag_value));
101 },
102 _ => {},
103 };
104 }
105 if let Some(query) = url_info.query() {
106 connect_url = connect_url.replace(&format!("?{query}"), "");
107 }
108
109 let mut agent = PyroscopeAgent::builder(&connect_url, &application_name);
110 if !user.is_empty() {
111 agent = agent.basic_auth(user, password);
112 }
113 let client = agent
114 .backend(pprof_backend(
115 PprofConfig::new()
116 .sample_rate(sample_rate)
117 .report_thread_id()
118 .report_thread_name(),
119 ))
120 .tags(tags.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect())
121 .build()
122 .context(PyroscopeSnafu)?;
123 info!(
124 application_name = application_name,
125 sample_rate = sample_rate,
126 url = connect_url,
127 tags = tags
128 .iter()
129 .map(|(k, v)| format!("{k}:{v}"))
130 .collect::<Vec<String>>()
131 .join(","),
132 "connect to pyroscope",
133 );
134 client.start().context(PyroscopeSnafu)
135}