1use async_trait::async_trait;
16use pingora::server::ShutdownWatch;
17use pingora::services::background::BackgroundService;
18use pyroscope::{
19 PyroscopeAgent, PyroscopeError, pyroscope::PyroscopeAgentRunning,
20};
21use pyroscope_pprofrs::{PprofConfig, pprof_backend};
22use snafu::{ResultExt, Snafu};
23use substring::Substring;
24use tracing::{error, info};
25use url::Url;
26
27static LOG_TARGET: &str = "pingap::pyroscope";
28
29#[derive(Debug, Snafu)]
30pub enum Error {
31 #[snafu(display("Url parse error {source}, {url}"))]
32 UrlParse {
33 source: url::ParseError,
34 url: String,
35 },
36 #[snafu(display("Pyroscope error {source}"))]
37 Pyroscope { source: PyroscopeError },
38}
39type Result<T, E = Error> = std::result::Result<T, E>;
40
41pub struct AgentService {
42 url: String,
43}
44
45pub fn new_agent_service(value: &str) -> AgentService {
46 AgentService {
47 url: value.to_string(),
48 }
49}
50
51#[async_trait]
52impl BackgroundService for AgentService {
53 async fn start(&self, mut shutdown: ShutdownWatch) {
54 match start_pyroscope(&self.url) {
55 Ok(agent_running) => {
56 let _ = shutdown.changed().await;
57 match agent_running.stop() {
58 Ok(agent_ready) => {
59 agent_ready.shutdown();
60 },
61 Err(e) => {
62 error!("stop pyroscope error: {}", e);
63 },
64 }
65 },
66 Err(e) => {
67 error!("start pyroscope error: {}", e);
68 },
69 }
70 }
71}
72
73fn start_pyroscope(
74 value: &str,
75) -> Result<PyroscopeAgent<PyroscopeAgentRunning>> {
76 let mut connect_url = value.to_string();
77 let url_info = Url::parse(value).context(UrlParseSnafu {
78 url: value.to_string(),
79 })?;
80 let mut application_name = "pingap".to_string();
81 let mut user = "".to_string();
82 let mut password = "".to_string();
83 let mut sample_rate = 100;
84 let mut tags = vec![];
85 let format_tag_value = |value: &str| {
86 if value.starts_with("$") {
87 std::env::var(value.substring(1, value.len()))
88 .unwrap_or(value.to_string())
89 } else {
90 value.to_string()
91 }
92 };
93 let tag_key_prefix = "tag:";
94 for (key, value) in url_info.query_pairs().into_iter() {
95 match key.as_ref() {
96 "app" => application_name = value.to_string(),
97 "user" => user = value.to_string(),
98 "password" => password = value.to_string(),
99 "sample_rate" => {
100 if let Ok(v) = value.parse::<u32>() {
101 sample_rate = v;
102 }
103 },
104 _ if key.starts_with(tag_key_prefix) => {
105 let tag_value = format_tag_value(&value);
106 let key =
107 key.substring(tag_key_prefix.len(), key.len()).to_string();
108 tags.push((key.to_string(), tag_value));
109 },
110 _ => {},
111 };
112 }
113 if let Some(query) = url_info.query() {
114 connect_url = connect_url.replace(&format!("?{query}"), "");
115 }
116
117 let mut agent = PyroscopeAgent::builder(&connect_url, &application_name);
118 if !user.is_empty() {
119 agent = agent.basic_auth(user, password);
120 }
121 let client = agent
122 .backend(pprof_backend(
123 PprofConfig::new()
124 .sample_rate(sample_rate)
125 .report_thread_id()
126 .report_thread_name(),
127 ))
128 .tags(tags.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect())
129 .build()
130 .context(PyroscopeSnafu)?;
131 info!(
132 target: LOG_TARGET,
133 application_name = application_name,
134 sample_rate = sample_rate,
135 url = connect_url,
136 tags = tags
137 .iter()
138 .map(|(k, v)| format!("{k}:{v}"))
139 .collect::<Vec<String>>()
140 .join(","),
141 "connect to pyroscope",
142 );
143 client.start().context(PyroscopeSnafu)
144}