1use std::collections::HashMap;
2
3use comfy_table::{ContentArrangement, Table};
4use reqwest::Method;
5use serde_json::{json, Value};
6
7use crate::interaction::InteractionTrait;
8use crate::{datetime, http};
9
10pub struct AGScheduler {
11 pub endpoint: String,
12}
13
14impl AGScheduler {
15 async fn _edit_job(
16 &self,
17 data: HashMap<&str, String>,
18 method: Method,
19 interaction: &dyn InteractionTrait,
20 ) {
21 let name = interaction.input_name(data.get("name").unwrap());
22 let _type = interaction.select_type().to_lowercase();
23
24 let mut start_at = String::new();
25 let mut interval = String::new();
26 let mut cron_expr = String::new();
27 match _type.as_str() {
28 "datetime" => {
29 start_at = interaction.input_start_at(data.get("start_at").unwrap());
30 }
31 "interval" => {
32 interval = interaction.input_interval(data.get("interval").unwrap());
33 }
34 "cron" => {
35 cron_expr = interaction.input_cron_expr(data.get("cron_expr").unwrap());
36 }
37 _ => {}
38 }
39
40 let mut tz = iana_time_zone::get_timezone().unwrap();
41 if !data.get("timezone").unwrap().is_empty() {
42 tz = data.get("timezone").unwrap().to_string();
43 }
44 let timezone = interaction.input_timezone(&tz);
45
46 let mut fn_selections: Vec<String> = vec![];
47 match http::fetch(
48 format!("{}{}", &self.endpoint, "/funcs"),
49 http::Options::default(),
50 )
51 .await
52 {
53 Ok(result) => {
54 if let Value::Array(list) = result {
55 for f in list {
56 let f_name = f["name"].as_str().unwrap().to_string();
57 fn_selections.push(f_name);
58 }
59 }
60 }
61 Err(err) => {
62 println!("Error: {}", err);
63 return;
64 }
65 }
66 let func_name = interaction.select_func_name(fn_selections);
67
68 let args = interaction.input_args(data.get("args").unwrap());
69 let timeout = interaction.input_timeout(data.get("timeout").unwrap());
70 let queues = interaction.input_queues(data.get("queues").unwrap());
71
72 let args_value: Value = serde_json::from_str(&args).unwrap();
73 let queues_value: Value = serde_json::from_str(&queues).unwrap();
74 let body = json!(
75 {
76 "id": data.get("id").unwrap(),
77 "name": name,
78 "type": _type,
79 "start_at": start_at,
80 "interval": interval,
81 "cron_expr": cron_expr,
82 "timezone": timezone,
83 "func_name": func_name,
84 "args": args_value,
85 "timeout": timeout,
86 "queues": queues_value,
87 }
88 );
89 http::fetch_show_json(
90 format!("{}{}", &self.endpoint, "/scheduler/job"),
91 http::Options {
92 method,
93 body: body.to_string(),
94 ..Default::default()
95 },
96 )
97 .await;
98 }
99
100 pub async fn add_job(&self, interaction: &dyn InteractionTrait) {
101 let mut data = HashMap::new();
102 for key in [
103 "id",
104 "name",
105 "type",
106 "start_at",
107 "interval",
108 "cron_expr",
109 "timezone",
110 "func_name",
111 "args",
112 "timeout",
113 "queues",
114 ] {
115 data.insert(key, "".to_string());
116 }
117
118 self._edit_job(data, Method::POST, interaction).await;
119 }
120
121 pub async fn update_job(&self, interaction: &dyn InteractionTrait) {
122 let id = interaction.input_id();
123
124 let mut data = HashMap::new();
125 match http::fetch(
126 format!("{}{}/{}", &self.endpoint, "/scheduler/job", id),
127 http::Options::default(),
128 )
129 .await
130 {
131 Ok(result) => {
132 data.insert("id", id);
133 data.insert("args", result["args"].to_string());
134 data.insert("queues", result["queues"].to_string());
135
136 for key in [
137 "name",
138 "type",
139 "start_at",
140 "interval",
141 "cron_expr",
142 "timezone",
143 "func_name",
144 "timeout",
145 ] {
146 data.insert(key, result[key].as_str().unwrap().to_string());
147 }
148 }
149 Err(err) => {
150 println!("Error: {}", err);
151 return;
152 }
153 }
154
155 self._edit_job(data, Method::PUT, interaction).await;
156 }
157
158 pub async fn get_job(&self, interaction: &dyn InteractionTrait) {
159 let id = interaction.input_id();
160
161 http::fetch_show_json(
162 format!("{}{}/{}", &self.endpoint, "/scheduler/job", id),
163 http::Options::default(),
164 )
165 .await;
166 }
167
168 pub async fn get_all_jobs(&self) {
169 match http::fetch(
170 format!("{}{}", &self.endpoint, "/scheduler/jobs"),
171 http::Options::default(),
172 )
173 .await
174 {
175 Ok(result) => {
176 let mut table = Table::new();
177 table
178 .set_content_arrangement(ContentArrangement::Dynamic)
179 .set_header(vec![
180 "ID",
181 "Name",
182 "Type",
183 "TypeValue",
184 "LastRunTime",
185 "NextRunTime",
186 "Status",
187 ]);
188
189 if let Value::Array(list) = result {
190 let total = list.len();
191 for j in list {
192 let _type = j["type"].as_str().unwrap();
193 let mut type_value = "";
194 match _type {
195 "datetime" => {
196 type_value = j["start_at"].as_str().unwrap();
197 }
198 "interval" => {
199 type_value = j["interval"].as_str().unwrap();
200 }
201 "cron" => {
202 type_value = j["cron_expr"].as_str().unwrap();
203 }
204 _ => {}
205 }
206 let last_run_time =
207 datetime::parse_iso8601_to_local(j["last_run_time"].as_str().unwrap())
208 .unwrap()
209 .format("%Y-%m-%d %H:%M:%S")
210 .to_string();
211 let next_run_time =
212 datetime::parse_iso8601_to_local(j["next_run_time"].as_str().unwrap())
213 .unwrap()
214 .format("%Y-%m-%d %H:%M:%S")
215 .to_string();
216 table.add_row(vec![
217 j["id"].as_str().unwrap(),
218 j["name"].as_str().unwrap(),
219 _type,
220 type_value,
221 &last_run_time[..],
222 &next_run_time[..],
223 j["status"].as_str().unwrap(),
224 ]);
225 }
226
227 println!("{table}");
228 println!("Total {}", total);
229 }
230 }
231 Err(err) => {
232 println!("Error: {}", err)
233 }
234 }
235 }
236
237 pub async fn delete_job(&self, interaction: &dyn InteractionTrait) {
238 let id = interaction.input_id();
239
240 if !interaction.confirm_delete() {
241 return;
242 }
243
244 http::fetch_show_ok(
245 format!("{}{}/{id}", &self.endpoint, "/scheduler/job"),
246 http::Options {
247 method: Method::DELETE,
248 ..Default::default()
249 },
250 )
251 .await;
252 }
253
254 pub async fn delete_all_jobs(&self, interaction: &dyn InteractionTrait) {
255 if !interaction.confirm_delete() {
256 return;
257 }
258
259 http::fetch_show_ok(
260 format!("{}{}", &self.endpoint, "/scheduler/jobs"),
261 http::Options {
262 method: Method::DELETE,
263 ..Default::default()
264 },
265 )
266 .await;
267 }
268
269 pub async fn pause_or_resume_job(&self, action: &str, interaction: &dyn InteractionTrait) {
270 let id = interaction.input_id();
271
272 http::fetch_show_ok(
273 format!("{}{}/{}/{}", &self.endpoint, "/scheduler/job", id, action),
274 http::Options {
275 method: Method::POST,
276 ..Default::default()
277 },
278 )
279 .await;
280 }
281
282 pub async fn run_or_schedule_job(&self, action: &str, interaction: &dyn InteractionTrait) {
283 let id = interaction.input_id();
284
285 match http::fetch(
286 format!("{}{}/{}", &self.endpoint, "/scheduler/job", id),
287 http::Options::default(),
288 )
289 .await
290 {
291 Ok(result) => {
292 let args = result["args"].to_string();
293 let args_value: Value = serde_json::from_str(&args).unwrap();
294 let queues = result["queues"].to_string();
295 let queues_value: Value = serde_json::from_str(&queues).unwrap();
296 let body = json!(
297 {
298 "id": id,
299 "name": result["name"].as_str().unwrap().to_string(),
300 "type": result["type"].as_str().unwrap().to_string(),
301 "start_at": result["start_at"].as_str().unwrap().to_string(),
302 "interval": result["interval"].as_str().unwrap().to_string(),
303 "cron_expr": result["cron_expr"].as_str().unwrap().to_string(),
304 "timezone": result["timezone"].as_str().unwrap().to_string(),
305 "func_name": result["func_name"].as_str().unwrap().to_string(),
306 "args": args_value,
307 "timeout": result["timeout"].as_str().unwrap().to_string(),
308 "queues": queues_value,
309 }
310 );
311 http::fetch_show_ok(
312 format!("{}{}/{}", &self.endpoint, "/scheduler/job", action),
313 http::Options {
314 method: Method::POST,
315 body: body.to_string(),
316 ..Default::default()
317 },
318 )
319 .await;
320 }
321 Err(err) => {
322 println!("Error: {}", err);
323 }
324 }
325 }
326
327 pub async fn start_or_stop(&self, action: &str) {
328 http::fetch_show_ok(
329 format!("{}{}/{}", &self.endpoint, "/scheduler", action),
330 http::Options {
331 method: Method::POST,
332 ..Default::default()
333 },
334 )
335 .await;
336 }
337
338 async fn _get_records(&self, job_id: &str, interaction: &dyn InteractionTrait) {
339 let page = interaction.input_page("");
340 let page_size = interaction.input_page_size("");
341
342 let mut url_path = String::from("/recorder/records");
343 if !job_id.is_empty() {
344 url_path = format!("{}/{}", url_path, job_id);
345 }
346 let query: String = format!("page={}&page_size={}", page, page_size);
347 url_path = format!("{}?{}", url_path, query);
348 match http::fetch(
349 format!("{}{}", &self.endpoint, url_path),
350 http::Options::default(),
351 )
352 .await
353 {
354 Ok(result) => {
355 let mut table = Table::new();
356 table
357 .set_content_arrangement(ContentArrangement::Dynamic)
358 .set_header(vec![
359 "ID", "JobName", "JobId", "Status", "StartAt", "EndAt", "Result",
360 ]);
361
362 if let Value::Object(map) = result {
363 if let Value::Array(list) = &map["res"] {
364 for r in list {
365 let start_at =
366 datetime::parse_iso8601_to_local(r["start_at"].as_str().unwrap())
367 .unwrap()
368 .format("%Y-%m-%d %H:%M:%S")
369 .to_string();
370 let mut end_at = String::from("");
371 if r["status"] != "running" {
372 end_at =
373 datetime::parse_iso8601_to_local(r["end_at"].as_str().unwrap())
374 .unwrap()
375 .format("%Y-%m-%d %H:%M:%S")
376 .to_string();
377 }
378 table.add_row(vec![
379 &r["id"].to_string(),
380 r["job_name"].as_str().unwrap(),
381 r["job_id"].as_str().unwrap(),
382 r["status"].as_str().unwrap(),
383 &start_at[..],
384 &end_at[..],
385 r["result"].as_str().unwrap(),
386 ]);
387 }
388
389 println!("{table}");
390 }
391
392 let page = map["page"].to_string().parse::<f32>().unwrap();
393 let page_size = map["page_size"].to_string().parse::<f32>().unwrap();
394 let total = map["total"].to_string().parse::<f32>().unwrap();
395 let page_count = total / page_size;
396 println!(
397 "Page {}/{} PageSize {} Total {}",
398 page,
399 page_count.ceil(),
400 page_size,
401 total
402 );
403 }
404 }
405 Err(err) => {
406 println!("Error: {}", err)
407 }
408 }
409 }
410
411 pub async fn get_records(&self, interaction: &dyn InteractionTrait) {
412 let job_id = interaction.input_job_id();
413 self._get_records(&job_id, interaction).await;
414 }
415
416 pub async fn get_all_records(&self, interaction: &dyn InteractionTrait) {
417 self._get_records("", interaction).await;
418 }
419
420 async fn _delete_records(&self, job_id: &str, interaction: &dyn InteractionTrait) {
421 if !interaction.confirm_delete() {
422 return;
423 }
424
425 let mut url_path = String::from("/recorder/records");
426 if !job_id.is_empty() {
427 url_path = format!("{}/{}", url_path, job_id);
428 }
429 http::fetch_show_ok(
430 format!("{}{}", &self.endpoint, url_path),
431 http::Options {
432 method: Method::DELETE,
433 ..Default::default()
434 },
435 )
436 .await;
437 }
438
439 pub async fn delete_records(&self, interaction: &dyn InteractionTrait) {
440 let job_id = interaction.input_job_id();
441 self._delete_records(&job_id, interaction).await;
442 }
443
444 pub async fn delete_all_records(&self, interaction: &dyn InteractionTrait) {
445 self._delete_records("", interaction).await;
446 }
447
448 pub async fn get_info(&self) {
449 http::fetch_show_json(
450 format!("{}{}", &self.endpoint, "/info"),
451 http::Options::default(),
452 )
453 .await;
454 }
455
456 pub async fn get_funcs(&self) {
457 match http::fetch(
458 format!("{}{}", &self.endpoint, "/funcs"),
459 http::Options::default(),
460 )
461 .await
462 {
463 Ok(result) => {
464 let mut table = Table::new();
465 table
466 .set_content_arrangement(ContentArrangement::Dynamic)
467 .set_header(vec!["name", "info"]);
468
469 if let Value::Array(list) = result {
470 let total = list.len();
471 for f in list {
472 table.add_row(vec![
473 f["name"].as_str().unwrap(),
474 f["info"].as_str().unwrap(),
475 ]);
476 }
477
478 println!("{table}");
479 println!("Total {}", total);
480 }
481 }
482 Err(err) => {
483 println!("Error: {}", err)
484 }
485 }
486 }
487
488 pub async fn get_queues(&self) {
489 match http::fetch(
490 format!("{}{}", &self.endpoint, "/broker/queues"),
491 http::Options::default(),
492 )
493 .await
494 {
495 Ok(result) => {
496 let mut table = Table::new();
497 table
498 .set_content_arrangement(ContentArrangement::Dynamic)
499 .set_header(vec!["Name", "Type", "Count", "Workers"]);
500
501 if let Value::Array(list) = result {
502 let total = list.len();
503 for q in list {
504 table.add_row(vec![
505 q["name"].as_str().unwrap(),
506 q["type"].as_str().unwrap(),
507 &q["count"].to_string(),
508 &q["workers"].to_string(),
509 ]);
510 }
511
512 println!("{table}");
513 println!("Total {}", total);
514 }
515 }
516 Err(err) => {
517 println!("Error: {}", err)
518 }
519 }
520 }
521
522 pub async fn get_cluster_nodes(&self) {
523 match http::fetch(
524 format!("{}{}", &self.endpoint, "/cluster/nodes"),
525 http::Options::default(),
526 )
527 .await
528 {
529 Ok(result) => {
530 let mut table = Table::new();
531 table
532 .set_content_arrangement(ContentArrangement::Dynamic)
533 .set_header(vec![
534 "Endpoint",
535 "Leader",
536 "EndpointGRPC",
537 "EndpointHTTP",
538 "EndpointMain",
539 "Queue",
540 "Mode",
541 "Version",
542 "Health",
543 "RegisterTime",
544 "LastHeartbeatTime",
545 ]);
546
547 if let Value::Object(map) = result {
548 let total = map.len();
549 for (_, n) in map.iter() {
550 let mut is_leader = false;
551 if n["endpoint"] == n["endpoint_main"] {
552 is_leader = true;
553 }
554 let register_time =
555 datetime::parse_iso8601_to_local(n["register_time"].as_str().unwrap())
556 .unwrap()
557 .format("%Y-%m-%d %H:%M:%S")
558 .to_string();
559 let last_heartbeat_time = datetime::parse_iso8601_to_local(
560 n["last_heartbeat_time"].as_str().unwrap(),
561 )
562 .unwrap()
563 .format("%Y-%m-%d %H:%M:%S")
564 .to_string();
565 table.add_row(vec![
566 n["endpoint"].as_str().unwrap(),
567 &is_leader.to_string(),
568 n["endpoint_grpc"].as_str().unwrap(),
569 n["endpoint_http"].as_str().unwrap(),
570 n["endpoint_main"].as_str().unwrap(),
571 n["queue"].as_str().unwrap(),
572 n["mode"].as_str().unwrap(),
573 n["version"].as_str().unwrap(),
574 &n["health"].as_bool().unwrap().to_string()[..],
575 ®ister_time[..],
576 &last_heartbeat_time[..],
577 ]);
578 }
579
580 println!("{table}");
581 println!("Total {}", total);
582 }
583 }
584 Err(err) => {
585 println!("Error: {}", err)
586 }
587 }
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594 use serde_json::json;
595
596 use crate::interaction::MockInteractionTrait;
597
598 #[tokio::test]
599 async fn it_api_client() {
600 let mut server = mockito::Server::new_async().await;
601 let url = server.url();
602
603 let id = String::from("00227fbf671f4ed2");
604 let job_id = String::from("b1638cfb7a8d4247");
605 let empty_data = json!({"data": null, "error": ""}).to_string();
606
607 let page = String::from("1");
608 let page_size = String::from("10");
609
610 server
611 .mock("POST", "/scheduler/job")
612 .with_status(200)
613 .with_body(&empty_data)
614 .create_async()
615 .await;
616 server
617 .mock("GET", format!("/scheduler/job/{}", id).as_str())
618 .with_status(200)
619 .with_body(
620 json!({
621 "data": {
622 "args": {
623
624 },
625 "cron_expr": "",
626 "end_at": "",
627 "func_name": "github.com/agscheduler/agscheduler/examples.PrintMsg",
628 "id": "00227fbf671f4ed2",
629 "interval": "60s",
630 "last_run_time": "0001-01-01T00:00:00Z",
631 "name": "myJob",
632 "next_run_time": "2024-04-15T04:19:12Z",
633 "queues": [],
634 "start_at": "",
635 "status": "running",
636 "timeout": "1h",
637 "timezone": "UTC",
638 "type": "interval"
639 },
640 "error": ""
641 })
642 .to_string(),
643 )
644 .create_async()
645 .await;
646 server
647 .mock("GET", "/scheduler/jobs")
648 .with_status(200)
649 .with_body(
650 json!({
651 "data": [
652 {
653 "args": {
654
655 },
656 "cron_expr": "",
657 "end_at": "",
658 "func_name": "github.com/agscheduler/agscheduler/examples.PrintMsg",
659 "id": "00227fbf671f4ed2",
660 "interval": "60s",
661 "last_run_time": "0001-01-01T00:00:00Z",
662 "name": "myJob",
663 "next_run_time": "2024-04-15T04:19:12Z",
664 "queues": [],
665 "start_at": "",
666 "status": "running",
667 "timeout": "1h",
668 "timezone": "UTC",
669 "type": "interval"
670 },
671 {
672 "args": {
673
674 },
675 "cron_expr": "",
676 "end_at": "",
677 "func_name": "github.com/agscheduler/agscheduler/examples.PrintMsg",
678 "id": "5hy65y56yh65y56h",
679 "interval": "",
680 "last_run_time": "0001-01-01T00:00:00Z",
681 "name": "myJob2",
682 "next_run_time": "2024-04-15T04:19:12Z",
683 "queues": [],
684 "start_at": "2024-05-16 17:16:08",
685 "status": "running",
686 "timeout": "1h",
687 "timezone": "UTC",
688 "type": "datetime"
689 },
690 {
691 "args": {
692
693 },
694 "cron_expr": "*/1 * * * *",
695 "end_at": "",
696 "func_name": "github.com/agscheduler/agscheduler/examples.PrintMsg",
697 "id": "n4yhb56j3gj45h56",
698 "interval": "",
699 "last_run_time": "0001-01-01T00:00:00Z",
700 "name": "myJob3",
701 "next_run_time": "2024-04-15T04:19:12Z",
702 "queues": [],
703 "start_at": "",
704 "status": "running",
705 "timeout": "1h",
706 "timezone": "UTC",
707 "type": "cron"
708 }
709 ],
710 "error": ""
711 })
712 .to_string(),
713 )
714 .create_async()
715 .await;
716 server
717 .mock("PUT", "/scheduler/job")
718 .with_status(200)
719 .with_body(&empty_data)
720 .create_async()
721 .await;
722 server
723 .mock("DELETE", format!("/scheduler/job/{}", id).as_str())
724 .with_status(200)
725 .with_body(&empty_data)
726 .create_async()
727 .await;
728 server
729 .mock("DELETE", "/scheduler/jobs")
730 .with_status(200)
731 .with_body(&empty_data)
732 .create_async()
733 .await;
734 server
735 .mock("POST", format!("/scheduler/job/{}/pause", id).as_str())
736 .with_status(200)
737 .with_body(&empty_data)
738 .create_async()
739 .await;
740 server
741 .mock("POST", format!("/scheduler/job/{}/resume", id).as_str())
742 .with_status(200)
743 .with_body(&empty_data)
744 .create_async()
745 .await;
746 server
747 .mock("POST", "/scheduler/job/run")
748 .with_status(200)
749 .with_body(&empty_data)
750 .create_async()
751 .await;
752 server
753 .mock("POST", "/scheduler/job/schedule")
754 .with_status(200)
755 .with_body(&empty_data)
756 .create_async()
757 .await;
758 server
759 .mock("POST", "/scheduler/start")
760 .with_status(200)
761 .with_body(&empty_data)
762 .create_async()
763 .await;
764 server
765 .mock("POST", "/scheduler/stop")
766 .with_status(200)
767 .with_body(&empty_data)
768 .create_async()
769 .await;
770 server
771 .mock(
772 "GET",
773 format!(
774 "/recorder/records/{}?page={}&page_size={}",
775 job_id, page, page_size
776 )
777 .as_str(),
778 )
779 .with_status(200)
780 .with_body(
781 json!({
782 "data": {
783 "page": 1,
784 "page_size": 10,
785 "res": [
786 {
787 "id": 516544388,
788 "job_id": "b1638cfb7a8d4247",
789 "job_name": "myJob5",
790 "status": "completed",
791 "result": "",
792 "start_at": "2024-06-03T11:27:28.002Z",
793 "end_at": "2024-06-03T11:27:28.027Z"
794 }
795 ],
796 "total": 1
797 },
798 "error": ""
799 })
800 .to_string(),
801 )
802 .create_async()
803 .await;
804 server
805 .mock(
806 "GET",
807 format!("/recorder/records?page={}&page_size={}", page, page_size).as_str(),
808 )
809 .with_status(200)
810 .with_body(
811 json!({
812 "data": {
813 "page": 1,
814 "page_size": 10,
815 "res": [
816 {
817 "id": 516544388,
818 "job_id": "b1638cfb7a8d4247",
819 "job_name": "myJob5",
820 "status": "completed",
821 "result": "",
822 "start_at": "2024-06-03T11:27:28.002Z",
823 "end_at": "2024-06-03T11:27:28.027Z"
824 },
825 {
826 "id": 516541097,
827 "job_id": "e99532afe9f44e63",
828 "job_name": "myJob4",
829 "status": "error",
830 "result": "error: something error",
831 "start_at": "2024-06-03T10:54:46.034Z",
832 "end_at": "2024-06-03T10:54:51.069Z"
833 }
834 ],
835 "total": 2
836 },
837 "error": ""
838 })
839 .to_string(),
840 )
841 .create_async()
842 .await;
843 server
844 .mock("DELETE", format!("/recorder/records/{}", job_id).as_str())
845 .with_status(200)
846 .with_body(&empty_data)
847 .create_async()
848 .await;
849 server
850 .mock("DELETE", "/recorder/records")
851 .with_status(200)
852 .with_body(&empty_data)
853 .create_async()
854 .await;
855 server
856 .mock("GET", "/info")
857 .with_status(200)
858 .with_body(
859 json!({
860 "data": {
861 "cluster_main_node": {
862 "endpoint": "127.0.0.1:36380",
863 "endpoint_grpc": "127.0.0.1:36360",
864 "endpoint_http": "127.0.0.1:36370",
865 "endpoint_main": "127.0.0.1:36380",
866 "mode": ""
867 },
868 "is_cluster_mode": true,
869 "is_running": false,
870 "version": "0.6.1"
871 },
872 "error": ""
873 })
874 .to_string(),
875 )
876 .create_async()
877 .await;
878 server
879 .mock("GET", "/funcs")
880 .with_status(200)
881 .with_body(
882 json!({
883 "data": [
884 {
885 "info": "",
886 "name": "github.com/agscheduler/agscheduler/examples.PrintMsg"
887 }
888 ],
889 "error": ""
890 })
891 .to_string(),
892 )
893 .create_async()
894 .await;
895 server
896 .mock("GET", "/broker/queues")
897 .with_status(200)
898 .with_body(
899 json!({
900 "data": [
901 {
902 "name": "default",
903 "type": "Memory",
904 "count": 1,
905 "workers": 2,
906 },
907 ],
908 "error": ""
909 })
910 .to_string(),
911 )
912 .create_async()
913 .await;
914 server
915 .mock("GET", "/cluster/nodes")
916 .with_status(200)
917 .with_body(
918 json!({
919 "data": {
920 "127.0.0.1:36380": {
921 "endpoint": "127.0.0.1:36380",
922 "endpoint_grpc": "127.0.0.1:36360",
923 "endpoint_http": "127.0.0.1:36370",
924 "endpoint_main": "127.0.0.1:36380",
925 "health": true,
926 "last_heartbeat_time": "2024-04-15T04:30:08.489043439Z",
927 "mode": "",
928 "queue": "default",
929 "register_time": "2024-04-15T04:08:10.438222846Z",
930 "version": "0.6.1"
931 }
932 },
933 "error": ""
934 })
935 .to_string(),
936 )
937 .create_async()
938 .await;
939
940 let mut mock = MockInteractionTrait::new();
941 mock.expect_input_id().return_const(id);
942 mock.expect_input_job_id().return_const(job_id);
943 mock.expect_confirm_delete().return_const(true);
944 mock.expect_input_name().return_const("myJob");
945 mock.expect_input_start_at()
946 .return_const("2024-04-16 15:23:51");
947 mock.expect_input_interval().return_const("60s");
948 mock.expect_input_cron_expr().return_const("*/1 * * * *");
949 mock.expect_input_timezone().return_const("UTC");
950 mock.expect_input_args().return_const("{}");
951 mock.expect_input_timeout().return_const("1h");
952 mock.expect_input_queues().return_const("[]");
953 mock.expect_input_page().return_const(page);
954 mock.expect_input_page_size().return_const(page_size);
955 mock.expect_select_type().return_const("Interval");
956 mock.expect_select_func_name()
957 .return_const("github.com/agscheduler/agscheduler/examples.PrintMsg");
958
959 let ags = AGScheduler { endpoint: url };
960
961 ags.add_job(&mock).await;
962 ags.get_job(&mock).await;
963 ags.get_all_jobs().await;
964 ags.update_job(&mock).await;
965 ags.delete_job(&mock).await;
966 ags.delete_all_jobs(&mock).await;
967 ags.pause_or_resume_job("pause", &mock).await;
968 ags.pause_or_resume_job("resume", &mock).await;
969 ags.run_or_schedule_job("run", &mock).await;
970 ags.run_or_schedule_job("schedule", &mock).await;
971 ags.start_or_stop("start").await;
972 ags.start_or_stop("stop").await;
973 ags.get_records(&mock).await;
974 ags.get_all_records(&mock).await;
975 ags.delete_records(&mock).await;
976 ags.delete_all_records(&mock).await;
977 ags.get_info().await;
978 ags.get_funcs().await;
979 ags.get_queues().await;
980 ags.get_cluster_nodes().await;
981 }
982}