1use super::{
16 DetectorGroupModel, Error, JsonSnafu, Model, ModelListParams, Schema, SchemaAllowCreate,
17 SchemaAllowEdit, SchemaOption, SchemaOptionValue, SchemaType, SchemaView, SqlxSnafu,
18 format_datetime, new_schema_options,
19};
20use serde::{Deserialize, Serialize};
21use snafu::ResultExt;
22use sqlx::FromRow;
23use sqlx::types::Json;
24use sqlx::{Pool, Postgres, QueryBuilder};
25use std::collections::HashMap;
26use time::PrimitiveDateTime;
27
28pub const REGION_ANY: &str = "any";
29pub const REGION_TX: &str = "tx";
30pub const REGION_GZ: &str = "gz";
31pub const REGION_ALIYUN: &str = "aliyun";
32
33type Result<T> = std::result::Result<T, Error>;
34
35#[derive(FromRow)]
36struct HttpDetectorSchema {
37 id: i64,
38 status: i16,
39 name: String,
40 interval: i16,
41 url: String,
42 method: String,
43 alpn_protocols: Option<Json<Vec<String>>>,
44 resolves: Option<Json<Vec<String>>>,
45 headers: Option<Json<HashMap<String, String>>>,
46 ip_version: i16,
47 skip_verify: bool,
48 dns_servers: Option<Json<Vec<String>>>,
49 body: Option<Vec<u8>>,
50 script: Option<String>,
51 alarm_url: String,
52 random_querystring: bool,
53 alarm_on_change: bool,
54 retries: i16,
55 failure_threshold: i16,
56 regions: Json<Vec<String>>,
57 group_id: i64,
58 verbose: bool,
59 created_by: i64,
60 remark: String,
61 created: PrimitiveDateTime,
62 modified: PrimitiveDateTime,
63}
64
65#[derive(Deserialize, Serialize, Clone, Debug)]
66pub struct HttpDetector {
67 pub id: i64,
68 pub status: i16,
69 pub name: String,
70 pub group_id: i64,
71 pub interval: i16,
72 pub url: String,
73 pub method: String,
74 pub alpn_protocols: Option<Vec<String>>,
75 pub resolves: Option<Vec<String>>,
76 pub headers: Option<HashMap<String, String>>,
77 pub dns_servers: Option<Vec<String>>,
78 pub ip_version: i16,
79 pub skip_verify: bool,
80 pub body: Option<Vec<u8>>,
81 pub script: Option<String>,
82 pub alarm_url: String,
83 pub random_querystring: bool,
84 pub alarm_on_change: bool,
85 pub retries: i16,
86 pub failure_threshold: i16,
87 pub regions: Vec<String>,
88 pub verbose: bool,
89 pub created_by: i64,
90 pub remark: String,
91 pub created: String,
92 pub modified: String,
93}
94
95impl From<HttpDetectorSchema> for HttpDetector {
96 fn from(schema: HttpDetectorSchema) -> Self {
97 Self {
98 id: schema.id,
99 status: schema.status,
100 name: schema.name,
101 group_id: schema.group_id,
102 interval: schema.interval,
103 url: schema.url,
104 method: schema.method,
105 alpn_protocols: schema.alpn_protocols.map(|protocols| protocols.0),
106 resolves: schema.resolves.map(|resolves| resolves.0),
107 headers: schema.headers.map(|headers| headers.0),
108 dns_servers: schema.dns_servers.map(|dns_servers| dns_servers.0),
109 ip_version: schema.ip_version,
110 skip_verify: schema.skip_verify,
111 body: schema.body,
112 script: schema.script,
113 alarm_url: schema.alarm_url,
114 random_querystring: schema.random_querystring,
115 alarm_on_change: schema.alarm_on_change,
116 retries: schema.retries,
117 failure_threshold: schema.failure_threshold,
118 regions: schema.regions.0,
119 verbose: schema.verbose,
120 created_by: schema.created_by,
121 remark: schema.remark,
122 created: format_datetime(schema.created),
123 modified: format_datetime(schema.modified),
124 }
125 }
126}
127
128#[derive(Debug, Clone, Deserialize, Default)]
129pub struct HttpDetectorInsertParams {
130 pub status: i16,
131 pub name: String,
132 pub group_id: u64,
133 pub url: String,
134 pub method: String,
135 pub alpn_protocols: Option<Vec<String>>,
136 pub resolves: Option<Vec<String>>,
137 pub headers: Option<HashMap<String, String>>,
138 pub ip_version: i32,
139 pub skip_verify: bool,
140 pub body: Option<Vec<u8>>,
141 pub script: Option<String>,
142 pub alarm_url: Option<String>,
143 pub interval: u16,
144 pub random_querystring: bool,
145 pub alarm_on_change: bool,
146 pub retries: u8,
147 pub failure_threshold: u8,
148 pub regions: Vec<String>,
149 pub verbose: bool,
150 pub created_by: u64,
151 pub remark: String,
152}
153
154#[derive(Debug, Clone, Deserialize, Default)]
155pub struct HttpDetectorUpdateParams {
156 pub status: Option<i16>,
157 pub name: Option<String>,
158 pub group_id: Option<u64>,
159 pub url: Option<String>,
160 pub method: Option<String>,
161 pub alpn_protocols: Option<Vec<String>>,
162 pub resolves: Option<Vec<String>>,
163 pub headers: Option<HashMap<String, String>>,
164 pub ip_version: Option<i32>,
165 pub skip_verify: Option<bool>,
166 pub alarm_url: Option<String>,
167 pub body: Option<Vec<u8>>,
168 pub interval: Option<u16>,
169 pub script: Option<String>,
170 pub random_querystring: Option<bool>,
171 pub alarm_on_change: Option<bool>,
172 pub retries: Option<u8>,
173 pub failure_threshold: Option<u8>,
174 pub regions: Option<Vec<String>>,
175 pub verbose: Option<bool>,
176 pub remark: Option<String>,
177}
178
179pub struct HttpDetectorModel {}
180
181impl HttpDetectorModel {
182 pub async fn list_enabled(&self, pool: &Pool<Postgres>) -> Result<Vec<HttpDetector>> {
183 let detectors = sqlx::query_as::<_, HttpDetectorSchema>(
184 r#"SELECT * FROM http_detectors WHERE deleted_at IS NULL AND status = 1"#,
185 )
186 .fetch_all(pool)
187 .await
188 .context(SqlxSnafu)?;
189
190 Ok(detectors.into_iter().map(|schema| schema.into()).collect())
191 }
192 pub async fn list_enabled_by_region(
193 &self,
194 pool: &Pool<Postgres>,
195 region: Option<String>,
196 limit: u64,
197 offset: u64,
198 ) -> Result<Vec<HttpDetector>> {
199 let region = region.unwrap_or(REGION_ANY.to_string());
200 let detectors = sqlx::query_as::<_, HttpDetectorSchema>(
201 r#"SELECT * FROM http_detectors WHERE deleted_at IS NULL AND status = 1 AND (jsonb_array_length(regions) = 0 OR regions @> $1::jsonb OR regions @> $2::jsonb) ORDER BY id ASC LIMIT $3 OFFSET $4"#,
202 )
203 .bind(format!("[{:?}]", region))
204 .bind(format!("[{:?}]", REGION_ANY))
205 .bind(limit as i64)
206 .bind(offset as i64)
207 .fetch_all(pool)
208 .await
209 .context(SqlxSnafu)?;
210
211 Ok(detectors.into_iter().map(|schema| schema.into()).collect())
212 }
213}
214
215impl Model for HttpDetectorModel {
216 type Output = HttpDetector;
217 fn new() -> Self {
218 Self {}
219 }
220 async fn schema_view(&self, pool: &Pool<Postgres>) -> SchemaView {
221 let mut group_options = vec![];
222 let group_model = DetectorGroupModel {};
223 if let Ok(groups) = group_model.list_enabled(pool).await {
224 for group in groups {
225 group_options.push(SchemaOption {
226 label: group.name,
227 value: SchemaOptionValue::Integer(group.id),
228 });
229 }
230 group_options.sort_by_key(|option| option.label.clone());
231 }
232 SchemaView {
233 schemas: vec![
234 Schema::new_id(),
235 Schema::new_name(),
236 Schema {
237 name: "group_id".to_string(),
238 category: SchemaType::Number,
239 required: true,
240 options: Some(group_options),
241 ..Default::default()
242 },
243 Schema {
244 name: "url".to_string(),
245 span: Some(2),
246 category: SchemaType::String,
247 required: true,
248 ..Default::default()
249 },
250 Schema {
251 name: "interval".to_string(),
252 category: SchemaType::Number,
253 default_value: Some(serde_json::json!(5)),
254 ..Default::default()
255 },
256 Schema {
257 name: "method".to_string(),
258 category: SchemaType::String,
259 options: Some(new_schema_options(&["GET", "POST", "PUT", "DELETE"])),
260 default_value: Some(serde_json::json!("GET")),
261 ..Default::default()
262 },
263 Schema {
264 name: "alpn_protocols".to_string(),
265 category: SchemaType::Strings,
266 options: Some(new_schema_options(&["http/1.1", "h2", "h3"])),
267 ..Default::default()
268 },
269 Schema {
270 name: "regions".to_string(),
271 category: SchemaType::Strings,
272 options: Some(new_schema_options(&[
273 REGION_ANY,
274 REGION_TX,
275 REGION_GZ,
276 REGION_ALIYUN,
277 ])),
278 ..Default::default()
279 },
280 Schema {
281 name: "resolves".to_string(),
282 category: SchemaType::Strings,
283 ..Default::default()
284 },
285 Schema {
286 name: "headers".to_string(),
287 category: SchemaType::Json,
288 hidden_values: vec!["{}".to_string(), "[]".to_string()],
289 ..Default::default()
290 },
291 Schema {
292 name: "alarm_url".to_string(),
293 category: SchemaType::String,
294 popover: true,
295 ..Default::default()
296 },
297 Schema {
298 name: "alarm_on_change".to_string(),
299 category: SchemaType::Boolean,
300 default_value: Some(serde_json::json!(false)),
301 ..Default::default()
302 },
303 Schema {
304 name: "retries".to_string(),
305 category: SchemaType::Number,
306 default_value: Some(serde_json::json!(0)),
307 ..Default::default()
308 },
309 Schema {
310 name: "failure_threshold".to_string(),
311 category: SchemaType::Number,
312 default_value: Some(serde_json::json!(0)),
313 ..Default::default()
314 },
315 Schema {
316 name: "ip_version".to_string(),
317 category: SchemaType::Number,
318 default_value: Some(serde_json::json!(0)),
319 hidden_values: vec!["0".to_string()],
320 ..Default::default()
321 },
322 Schema {
323 name: "skip_verify".to_string(),
324 category: SchemaType::Boolean,
325 default_value: Some(serde_json::json!(false)),
326 ..Default::default()
327 },
328 Schema {
329 name: "random_querystring".to_string(),
330 category: SchemaType::Boolean,
331 default_value: Some(serde_json::json!(false)),
332 ..Default::default()
333 },
334 Schema {
335 name: "verbose".to_string(),
336 category: SchemaType::Boolean,
337 default_value: Some(serde_json::json!(false)),
338 ..Default::default()
339 },
340 Schema {
341 name: "dns_servers".to_string(),
342 category: SchemaType::Strings,
343 ..Default::default()
344 },
345 Schema {
346 name: "body".to_string(),
347 category: SchemaType::Json,
348 popover: true,
349 ..Default::default()
350 },
351 Schema {
352 name: "script".to_string(),
353 category: SchemaType::Code,
354 span: Some(2),
355 popover: true,
356 ..Default::default()
357 },
358 Schema::new_status(),
359 Schema::new_remark(),
360 Schema::new_created(),
361 Schema::new_modified(),
362 ],
363 allow_edit: SchemaAllowEdit {
364 owner: true,
365 roles: vec!["*".to_string()],
366 ..Default::default()
367 },
368 allow_create: SchemaAllowCreate {
369 roles: vec!["*".to_string()],
370 ..Default::default()
371 },
372 }
373 }
374
375 fn push_filter_conditions<'args>(
376 &self,
377 qb: &mut QueryBuilder<'args, Postgres>,
378 filters: &HashMap<String, String>,
379 ) -> Result<()> {
380 if let Some(status) = filters.get("status").and_then(|s| s.parse::<i16>().ok()) {
381 qb.push(" AND status = ");
382 qb.push_bind(status);
383 }
384 Ok(())
385 }
386 async fn insert(&self, pool: &Pool<Postgres>, params: serde_json::Value) -> Result<u64> {
387 let params: HttpDetectorInsertParams = serde_json::from_value(params).context(JsonSnafu)?;
388 let row: (i64,) = sqlx::query_as(
389 r#"INSERT INTO http_detectors (status, name, group_id, url, method, alpn_protocols, resolves, headers, ip_version, skip_verify, body, "interval", script, alarm_url, random_querystring, alarm_on_change, retries, failure_threshold, "verbose", regions, created_by, remark) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22) RETURNING id"#,
390 )
391 .bind(params.status)
392 .bind(params.name)
393 .bind(params.group_id as i64)
394 .bind(params.url)
395 .bind(params.method)
396 .bind(params.alpn_protocols.map(Json).unwrap_or_default())
397 .bind(params.resolves.map(Json).unwrap_or_default())
398 .bind(params.headers.map(Json).unwrap_or_default())
399 .bind(params.ip_version as i16)
400 .bind(params.skip_verify)
401 .bind(params.body)
402 .bind(params.interval as i16)
403 .bind(params.script)
404 .bind(params.alarm_url.unwrap_or_default())
405 .bind(params.random_querystring)
406 .bind(params.alarm_on_change)
407 .bind(params.retries as i16)
408 .bind(params.failure_threshold as i16)
409 .bind(params.verbose)
410 .bind(Json(params.regions))
411 .bind(params.created_by as i64)
412 .bind(params.remark)
413 .fetch_one(pool)
414 .await
415 .context(SqlxSnafu)?;
416
417 Ok(row.0 as u64)
418 }
419 async fn get_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<Option<Self::Output>> {
420 let result = sqlx::query_as::<_, HttpDetectorSchema>(
421 r#"SELECT * FROM http_detectors WHERE id = $1 AND deleted_at IS NULL"#,
422 )
423 .bind(id as i64)
424 .fetch_optional(pool)
425 .await
426 .context(SqlxSnafu)?;
427
428 Ok(result.map(|schema| schema.into()))
429 }
430 async fn delete_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<()> {
431 sqlx::query(
432 r#"UPDATE http_detectors SET deleted_at = CURRENT_TIMESTAMP WHERE id = $1 AND deleted_at IS NULL"#,
433 )
434 .bind(id as i64)
435 .execute(pool)
436 .await
437 .context(SqlxSnafu)?;
438
439 Ok(())
440 }
441 async fn update_by_id(
442 &self,
443 pool: &Pool<Postgres>,
444 id: u64,
445 params: serde_json::Value,
446 ) -> Result<()> {
447 let params: HttpDetectorUpdateParams = serde_json::from_value(params).context(JsonSnafu)?;
448
449 let _ = sqlx::query(
450 r#"UPDATE http_detectors SET status = COALESCE($1, status), name = COALESCE($2, name), group_id = COALESCE($3, group_id), url = COALESCE($4, url), method = COALESCE($5, method), alpn_protocols = COALESCE($6, alpn_protocols), resolves = COALESCE($7, resolves), headers = COALESCE($8, headers), ip_version = COALESCE($9, ip_version), skip_verify = COALESCE($10, skip_verify), body = COALESCE($11, body), "interval" = COALESCE($12, "interval"), script = COALESCE($13, script), alarm_url = COALESCE($14, alarm_url), random_querystring = COALESCE($15, random_querystring), alarm_on_change = COALESCE($16, alarm_on_change), retries = COALESCE($17, retries), failure_threshold = COALESCE($18, failure_threshold), "verbose" = COALESCE($19, "verbose"), regions = COALESCE($20, regions), remark = COALESCE($21, remark) WHERE id = $22 AND deleted_at IS NULL"#,
451 )
452 .bind(params.status)
453 .bind(params.name)
454 .bind(params.group_id.map(|v| v as i64))
455 .bind(params.url)
456 .bind(params.method)
457 .bind(params.alpn_protocols.map(Json))
458 .bind(params.resolves.map(Json))
459 .bind(params.headers.map(Json))
460 .bind(params.ip_version.map(|v| v as i16))
461 .bind(params.skip_verify)
462 .bind(params.body)
463 .bind(params.interval.map(|v| v as i16))
464 .bind(params.script)
465 .bind(params.alarm_url)
466 .bind(params.random_querystring)
467 .bind(params.alarm_on_change)
468 .bind(params.retries.map(|v| v as i16))
469 .bind(params.failure_threshold.map(|v| v as i16))
470 .bind(params.verbose)
471 .bind(params.regions.map(Json))
472 .bind(params.remark)
473 .bind(id as i64)
474 .execute(pool)
475 .await
476 .context(SqlxSnafu)?;
477
478 Ok(())
479 }
480 async fn count(&self, pool: &Pool<Postgres>, params: &ModelListParams) -> Result<i64> {
481 let mut qb = QueryBuilder::new("SELECT COUNT(*) FROM http_detectors");
482 self.push_conditions(&mut qb, params)?;
483 let count = qb
484 .build_query_scalar::<i64>()
485 .fetch_one(pool)
486 .await
487 .context(SqlxSnafu)?;
488 Ok(count)
489 }
490
491 async fn list(
492 &self,
493 pool: &Pool<Postgres>,
494 params: &ModelListParams,
495 ) -> Result<Vec<Self::Output>> {
496 let mut qb = QueryBuilder::new("SELECT * FROM http_detectors");
497 self.push_conditions(&mut qb, params)?;
498 params.push_pagination(&mut qb);
499 let detectors = qb
500 .build_query_as::<HttpDetectorSchema>()
501 .fetch_all(pool)
502 .await
503 .context(SqlxSnafu)?;
504 Ok(detectors.into_iter().map(|s| s.into()).collect())
505 }
506}