1use super::DetectorGroupModel;
16use serde::{Deserialize, Serialize};
17use snafu::ResultExt;
18use sqlx::FromRow;
19use sqlx::types::Json;
20use sqlx::{Pool, Postgres, QueryBuilder};
21use std::collections::HashMap;
22use tibba_model::{
23 Error, JsonSnafu, Model, ModelListParams, Schema, SchemaAllowCreate, SchemaAllowEdit,
24 SchemaOption, SchemaOptionValue, SchemaType, SchemaView, SqlxSnafu, format_datetime,
25 new_schema_options,
26};
27use time::PrimitiveDateTime;
28
29pub const REGION_ANY: &str = "any";
30pub const REGION_TX: &str = "tx";
31pub const REGION_GZ: &str = "gz";
32pub const REGION_ALIYUN: &str = "aliyun";
33
34type Result<T> = std::result::Result<T, Error>;
35
36#[derive(FromRow)]
37struct HttpDetectorSchema {
38 id: i64,
39 status: i16,
40 name: String,
41 interval: i16,
42 url: String,
43 method: String,
44 alpn_protocols: Option<Json<Vec<String>>>,
45 resolves: Option<Json<Vec<String>>>,
46 headers: Option<Json<HashMap<String, String>>>,
47 ip_version: i16,
48 skip_verify: bool,
49 dns_servers: Option<Json<Vec<String>>>,
50 body: Option<Vec<u8>>,
51 script: Option<String>,
52 alarm_url: String,
53 random_querystring: bool,
54 alarm_on_change: bool,
55 retries: i16,
56 failure_threshold: i16,
57 regions: Json<Vec<String>>,
58 group_id: i64,
59 verbose: bool,
60 created_by: i64,
61 remark: String,
62 created: PrimitiveDateTime,
63 modified: PrimitiveDateTime,
64}
65
66#[derive(Deserialize, Serialize, Clone, Debug)]
67pub struct HttpDetector {
68 pub id: i64,
69 pub status: i16,
70 pub name: String,
71 pub group_id: i64,
72 pub interval: i16,
73 pub url: String,
74 pub method: String,
75 pub alpn_protocols: Option<Vec<String>>,
76 pub resolves: Option<Vec<String>>,
77 pub headers: Option<HashMap<String, String>>,
78 pub dns_servers: Option<Vec<String>>,
79 pub ip_version: i16,
80 pub skip_verify: bool,
81 pub body: Option<Vec<u8>>,
82 pub script: Option<String>,
83 pub alarm_url: String,
84 pub random_querystring: bool,
85 pub alarm_on_change: bool,
86 pub retries: i16,
87 pub failure_threshold: i16,
88 pub regions: Vec<String>,
89 pub verbose: bool,
90 pub created_by: i64,
91 pub remark: String,
92 pub created: String,
93 pub modified: String,
94}
95
96impl From<HttpDetectorSchema> for HttpDetector {
97 fn from(schema: HttpDetectorSchema) -> Self {
98 Self {
99 id: schema.id,
100 status: schema.status,
101 name: schema.name,
102 group_id: schema.group_id,
103 interval: schema.interval,
104 url: schema.url,
105 method: schema.method,
106 alpn_protocols: schema.alpn_protocols.map(|protocols| protocols.0),
107 resolves: schema.resolves.map(|resolves| resolves.0),
108 headers: schema.headers.map(|headers| headers.0),
109 dns_servers: schema.dns_servers.map(|dns_servers| dns_servers.0),
110 ip_version: schema.ip_version,
111 skip_verify: schema.skip_verify,
112 body: schema.body,
113 script: schema.script,
114 alarm_url: schema.alarm_url,
115 random_querystring: schema.random_querystring,
116 alarm_on_change: schema.alarm_on_change,
117 retries: schema.retries,
118 failure_threshold: schema.failure_threshold,
119 regions: schema.regions.0,
120 verbose: schema.verbose,
121 created_by: schema.created_by,
122 remark: schema.remark,
123 created: format_datetime(schema.created),
124 modified: format_datetime(schema.modified),
125 }
126 }
127}
128
129#[derive(Debug, Clone, Deserialize, Default)]
130pub struct HttpDetectorInsertParams {
131 pub status: i16,
132 pub name: String,
133 pub group_id: u64,
134 pub url: String,
135 pub method: String,
136 pub alpn_protocols: Option<Vec<String>>,
137 pub resolves: Option<Vec<String>>,
138 pub headers: Option<HashMap<String, String>>,
139 pub ip_version: i32,
140 pub skip_verify: bool,
141 pub body: Option<Vec<u8>>,
142 pub script: Option<String>,
143 pub alarm_url: Option<String>,
144 pub interval: u16,
145 pub random_querystring: bool,
146 pub alarm_on_change: bool,
147 pub retries: u8,
148 pub failure_threshold: u8,
149 pub regions: Vec<String>,
150 pub verbose: bool,
151 pub created_by: u64,
152 pub remark: String,
153}
154
155#[derive(Debug, Clone, Deserialize, Default)]
156pub struct HttpDetectorUpdateParams {
157 pub status: Option<i16>,
158 pub name: Option<String>,
159 pub group_id: Option<u64>,
160 pub url: Option<String>,
161 pub method: Option<String>,
162 pub alpn_protocols: Option<Vec<String>>,
163 pub resolves: Option<Vec<String>>,
164 pub headers: Option<HashMap<String, String>>,
165 pub ip_version: Option<i32>,
166 pub skip_verify: Option<bool>,
167 pub alarm_url: Option<String>,
168 pub body: Option<Vec<u8>>,
169 pub interval: Option<u16>,
170 pub script: Option<String>,
171 pub random_querystring: Option<bool>,
172 pub alarm_on_change: Option<bool>,
173 pub retries: Option<u8>,
174 pub failure_threshold: Option<u8>,
175 pub regions: Option<Vec<String>>,
176 pub verbose: Option<bool>,
177 pub remark: Option<String>,
178}
179
180pub struct HttpDetectorModel {}
181
182impl HttpDetectorModel {
183 pub async fn list_enabled(&self, pool: &Pool<Postgres>) -> Result<Vec<HttpDetector>> {
184 let detectors = sqlx::query_as::<_, HttpDetectorSchema>(
185 r#"SELECT * FROM http_detectors WHERE deleted_at IS NULL AND status = 1"#,
186 )
187 .fetch_all(pool)
188 .await
189 .context(SqlxSnafu)?;
190
191 Ok(detectors.into_iter().map(|schema| schema.into()).collect())
192 }
193 pub async fn list_enabled_by_region(
194 &self,
195 pool: &Pool<Postgres>,
196 region: Option<String>,
197 limit: u64,
198 offset: u64,
199 ) -> Result<Vec<HttpDetector>> {
200 let region = region.unwrap_or(REGION_ANY.to_string());
201 let detectors = sqlx::query_as::<_, HttpDetectorSchema>(
202 r#"SELECT * FROM http_detectors WHERE deleted_at IS NULL AND status = 1 AND (jsonb_array_length(regions) = 0 OR regions @> $1::jsonb OR regions @> $2::jsonb) ORDER BY id ASC LIMIT $3 OFFSET $4"#,
203 )
204 .bind(format!("[{:?}]", region))
205 .bind(format!("[{:?}]", REGION_ANY))
206 .bind(limit as i64)
207 .bind(offset as i64)
208 .fetch_all(pool)
209 .await
210 .context(SqlxSnafu)?;
211
212 Ok(detectors.into_iter().map(|schema| schema.into()).collect())
213 }
214}
215
216impl Model for HttpDetectorModel {
217 type Output = HttpDetector;
218 fn new() -> Self {
219 Self {}
220 }
221 async fn schema_view(&self, pool: &Pool<Postgres>) -> SchemaView {
222 let mut group_options = vec![];
223 let group_model = DetectorGroupModel {};
224 if let Ok(groups) = group_model.list_enabled(pool).await {
225 for group in groups {
226 group_options.push(SchemaOption {
227 label: group.name,
228 value: SchemaOptionValue::Integer(group.id),
229 });
230 }
231 group_options.sort_by_key(|option| option.label.clone());
232 }
233 SchemaView {
234 schemas: vec![
235 Schema::new_id(),
236 Schema::new_name(),
237 Schema {
238 name: "group_id".to_string(),
239 category: SchemaType::Number,
240 required: true,
241 options: Some(group_options),
242 ..Default::default()
243 },
244 Schema {
245 name: "url".to_string(),
246 span: Some(2),
247 category: SchemaType::String,
248 required: true,
249 ..Default::default()
250 },
251 Schema {
252 name: "interval".to_string(),
253 category: SchemaType::Number,
254 default_value: Some(serde_json::json!(5)),
255 ..Default::default()
256 },
257 Schema {
258 name: "method".to_string(),
259 category: SchemaType::String,
260 options: Some(new_schema_options(&["GET", "POST", "PUT", "DELETE"])),
261 default_value: Some(serde_json::json!("GET")),
262 ..Default::default()
263 },
264 Schema {
265 name: "alpn_protocols".to_string(),
266 category: SchemaType::Strings,
267 options: Some(new_schema_options(&["http/1.1", "h2", "h3"])),
268 ..Default::default()
269 },
270 Schema {
271 name: "regions".to_string(),
272 category: SchemaType::Strings,
273 options: Some(new_schema_options(&[
274 REGION_ANY,
275 REGION_TX,
276 REGION_GZ,
277 REGION_ALIYUN,
278 ])),
279 ..Default::default()
280 },
281 Schema {
282 name: "resolves".to_string(),
283 category: SchemaType::Strings,
284 ..Default::default()
285 },
286 Schema {
287 name: "headers".to_string(),
288 category: SchemaType::Json,
289 hidden_values: vec!["{}".to_string(), "[]".to_string()],
290 ..Default::default()
291 },
292 Schema {
293 name: "alarm_url".to_string(),
294 category: SchemaType::String,
295 popover: true,
296 ..Default::default()
297 },
298 Schema {
299 name: "alarm_on_change".to_string(),
300 category: SchemaType::Boolean,
301 default_value: Some(serde_json::json!(false)),
302 ..Default::default()
303 },
304 Schema {
305 name: "retries".to_string(),
306 category: SchemaType::Number,
307 default_value: Some(serde_json::json!(0)),
308 ..Default::default()
309 },
310 Schema {
311 name: "failure_threshold".to_string(),
312 category: SchemaType::Number,
313 default_value: Some(serde_json::json!(0)),
314 ..Default::default()
315 },
316 Schema {
317 name: "ip_version".to_string(),
318 category: SchemaType::Number,
319 default_value: Some(serde_json::json!(0)),
320 hidden_values: vec!["0".to_string()],
321 ..Default::default()
322 },
323 Schema {
324 name: "skip_verify".to_string(),
325 category: SchemaType::Boolean,
326 default_value: Some(serde_json::json!(false)),
327 ..Default::default()
328 },
329 Schema {
330 name: "random_querystring".to_string(),
331 category: SchemaType::Boolean,
332 default_value: Some(serde_json::json!(false)),
333 ..Default::default()
334 },
335 Schema {
336 name: "verbose".to_string(),
337 category: SchemaType::Boolean,
338 default_value: Some(serde_json::json!(false)),
339 ..Default::default()
340 },
341 Schema {
342 name: "dns_servers".to_string(),
343 category: SchemaType::Strings,
344 ..Default::default()
345 },
346 Schema {
347 name: "body".to_string(),
348 category: SchemaType::Json,
349 popover: true,
350 ..Default::default()
351 },
352 Schema {
353 name: "script".to_string(),
354 category: SchemaType::Code,
355 span: Some(2),
356 popover: true,
357 ..Default::default()
358 },
359 Schema::new_status(),
360 Schema::new_remark(),
361 Schema::new_created(),
362 Schema::new_modified(),
363 ],
364 allow_edit: SchemaAllowEdit {
365 owner: true,
366 roles: vec!["*".to_string()],
367 ..Default::default()
368 },
369 allow_create: SchemaAllowCreate {
370 roles: vec!["*".to_string()],
371 ..Default::default()
372 },
373 }
374 }
375
376 fn push_filter_conditions<'args>(
377 &self,
378 qb: &mut QueryBuilder<'args, Postgres>,
379 filters: &HashMap<String, String>,
380 ) -> Result<()> {
381 if let Some(status) = filters.get("status").and_then(|s| s.parse::<i16>().ok()) {
382 qb.push(" AND status = ");
383 qb.push_bind(status);
384 }
385 Ok(())
386 }
387 async fn insert(&self, pool: &Pool<Postgres>, params: serde_json::Value) -> Result<u64> {
388 let params: HttpDetectorInsertParams = serde_json::from_value(params).context(JsonSnafu)?;
389 let row: (i64,) = sqlx::query_as(
390 r#"INSERT INTO http_detectors (status, name, group_id, url, method, alpn_protocols, resolves, headers, ip_version, skip_verify, body, "interval", script, alarm_url, random_querystring, alarm_on_change, retries, failure_threshold, "verbose", regions, created_by, remark) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22) RETURNING id"#,
391 )
392 .bind(params.status)
393 .bind(params.name)
394 .bind(params.group_id as i64)
395 .bind(params.url)
396 .bind(params.method)
397 .bind(params.alpn_protocols.map(Json).unwrap_or_default())
398 .bind(params.resolves.map(Json).unwrap_or_default())
399 .bind(params.headers.map(Json).unwrap_or_default())
400 .bind(params.ip_version as i16)
401 .bind(params.skip_verify)
402 .bind(params.body)
403 .bind(params.interval as i16)
404 .bind(params.script)
405 .bind(params.alarm_url.unwrap_or_default())
406 .bind(params.random_querystring)
407 .bind(params.alarm_on_change)
408 .bind(params.retries as i16)
409 .bind(params.failure_threshold as i16)
410 .bind(params.verbose)
411 .bind(Json(params.regions))
412 .bind(params.created_by as i64)
413 .bind(params.remark)
414 .fetch_one(pool)
415 .await
416 .context(SqlxSnafu)?;
417
418 Ok(row.0 as u64)
419 }
420 async fn get_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<Option<Self::Output>> {
421 let result = sqlx::query_as::<_, HttpDetectorSchema>(
422 r#"SELECT * FROM http_detectors WHERE id = $1 AND deleted_at IS NULL"#,
423 )
424 .bind(id as i64)
425 .fetch_optional(pool)
426 .await
427 .context(SqlxSnafu)?;
428
429 Ok(result.map(|schema| schema.into()))
430 }
431 async fn delete_by_id(&self, pool: &Pool<Postgres>, id: u64) -> Result<()> {
432 sqlx::query(
433 r#"UPDATE http_detectors SET deleted_at = NOW(), modified = NOW() WHERE id = $1 AND deleted_at IS NULL"#,
434 )
435 .bind(id as i64)
436 .execute(pool)
437 .await
438 .context(SqlxSnafu)?;
439
440 Ok(())
441 }
442 async fn update_by_id(
443 &self,
444 pool: &Pool<Postgres>,
445 id: u64,
446 params: serde_json::Value,
447 ) -> Result<()> {
448 let params: HttpDetectorUpdateParams = serde_json::from_value(params).context(JsonSnafu)?;
449
450 let _ = sqlx::query(
451 r#"UPDATE http_detectors SET status = COALESCE($1, status), name = COALESCE($2, name), group_id = COALESCE($3, group_id), url = COALESCE($4, url), method = COALESCE($5, method), alpn_protocols = COALESCE($6, alpn_protocols), resolves = COALESCE($7, resolves), headers = COALESCE($8, headers), ip_version = COALESCE($9, ip_version), skip_verify = COALESCE($10, skip_verify), body = COALESCE($11, body), "interval" = COALESCE($12, "interval"), script = COALESCE($13, script), alarm_url = COALESCE($14, alarm_url), random_querystring = COALESCE($15, random_querystring), alarm_on_change = COALESCE($16, alarm_on_change), retries = COALESCE($17, retries), failure_threshold = COALESCE($18, failure_threshold), "verbose" = COALESCE($19, "verbose"), regions = COALESCE($20, regions), remark = COALESCE($21, remark), modified = NOW() WHERE id = $22 AND deleted_at IS NULL"#,
452 )
453 .bind(params.status)
454 .bind(params.name)
455 .bind(params.group_id.map(|v| v as i64))
456 .bind(params.url)
457 .bind(params.method)
458 .bind(params.alpn_protocols.map(Json))
459 .bind(params.resolves.map(Json))
460 .bind(params.headers.map(Json))
461 .bind(params.ip_version.map(|v| v as i16))
462 .bind(params.skip_verify)
463 .bind(params.body)
464 .bind(params.interval.map(|v| v as i16))
465 .bind(params.script)
466 .bind(params.alarm_url)
467 .bind(params.random_querystring)
468 .bind(params.alarm_on_change)
469 .bind(params.retries.map(|v| v as i16))
470 .bind(params.failure_threshold.map(|v| v as i16))
471 .bind(params.verbose)
472 .bind(params.regions.map(Json))
473 .bind(params.remark)
474 .bind(id as i64)
475 .execute(pool)
476 .await
477 .context(SqlxSnafu)?;
478
479 Ok(())
480 }
481 async fn count(&self, pool: &Pool<Postgres>, params: &ModelListParams) -> Result<i64> {
482 let mut qb = QueryBuilder::new("SELECT COUNT(*) FROM http_detectors");
483 self.push_conditions(&mut qb, params)?;
484 let count = qb
485 .build_query_scalar::<i64>()
486 .fetch_one(pool)
487 .await
488 .context(SqlxSnafu)?;
489 Ok(count)
490 }
491
492 async fn list(
493 &self,
494 pool: &Pool<Postgres>,
495 params: &ModelListParams,
496 ) -> Result<Vec<Self::Output>> {
497 let mut qb = QueryBuilder::new("SELECT * FROM http_detectors");
498 self.push_conditions(&mut qb, params)?;
499 params.push_pagination(&mut qb);
500 let detectors = qb
501 .build_query_as::<HttpDetectorSchema>()
502 .fetch_all(pool)
503 .await
504 .context(SqlxSnafu)?;
505 Ok(detectors.into_iter().map(|s| s.into()).collect())
506 }
507}