1use crate::graph::graph::GraphMod;
2use crate::graph::graph_run::GraphRun;
3use graph::{
4 graph::{Graph, GraphStatus},
5 registry::GraphRegistry,
6};
7use my_reg::{ControlEvent, ControlEventType, broadcast_event, deregister, register};
8use operators::factory::add_node_to_graph;
9use serde_yaml::Value;
10use tokio::sync::{
11 broadcast::{Receiver, Sender},
12 mpsc,
13};
14use tokio::time::timeout;
15use tracing::{debug, error, info};
16
17pub mod graph;
18pub mod my_reg;
19pub mod operators;
20mod ports;
21pub mod random;
22pub mod settings;
23mod shared_resource;
24pub mod timestamp;
25
26pub fn add(left: u64, right: u64) -> u64 {
27 left + right
28}
29
30pub fn build_graph(yaml: Value) -> String {
31 let graph_name = yaml["name"].as_str().unwrap().to_string();
32
33 let mut graph = Graph::new(graph_name.clone());
34
35 for node in yaml["nodes"].as_sequence().unwrap() {
36 add_node_to_graph(node, &mut graph);
37 }
38
39 if yaml.get("edges").is_some() {
40 for edge in yaml["edges"].as_sequence().unwrap() {
41 let from = edge["from"].as_str().unwrap();
42 let to = edge["to"].as_str().unwrap();
43 let role = edge["role"].as_str().unwrap_or_else(|| "default");
44 graph.add_edge(String::from(from), String::from(to), String::from(role));
45 }
46 }
47
48 graph.no_processor();
49 graph.register();
50
51 let graph = GraphRegistry::get_graph(graph_name.as_str());
52 if graph.is_none() {
53 error!("Graph not found: {}", graph_name);
54 panic!("Graph not found: {}", graph_name);
55 }
56 let graph = graph.unwrap();
57
58 let mut graph = graph.lock().unwrap();
59
60 graph.make_active_if_ready();
61 graph.replace_runner();
62
63 graph_name
64}
65
66pub fn get_pending() -> Vec<String> {
67 let graph_registry = GraphRegistry::get_instance();
68
69 let pending_graphs = {
70 let graph_names = {
71 let graph_registry = graph_registry.lock().unwrap();
72 graph_registry.graph_names()
73 };
74 info!("GetPending graph_list={:?}", graph_names);
75 graph_names
76 .iter()
77 .filter(|name| {
78 let g = GraphRegistry::get_graph(name);
79 if g.is_none() {
80 error!("Graph not found: {}", name);
81 return true;
82 }
83 let g = g.unwrap();
84 let graph = g.lock();
85 if graph.is_err() {
86 error!("Graph is locked, skipping: {:?}", g);
87 return true;
88 }
89 let graph = graph.unwrap();
90
91 graph.runner.state == GraphStatus::Inactive
92 })
93 .map(|name| name.to_string())
94 .collect::<Vec<String>>()
95 };
96 info!("Pending graphs: {:?}", pending_graphs);
97 pending_graphs
98}
99
100pub async fn start_graphs() {
101 debug!("Starting graphs...");
102 let (tx, mut rx) = tokio::sync::mpsc::channel::<ControlEvent>(100);
103
104 register("startup", tx);
105
106 let my_duration = tokio::time::Duration::from_millis(2000);
107
108 loop {
109 let mut pending = get_pending();
110
111 if pending.len() == 0 {
112 info!("No pending graphs to start.");
113 break;
114 }
115 let msg = timeout(my_duration, rx.recv()).await;
116 match msg {
117 Ok(msg) => match msg {
118 Some(msg) => {
119 if msg.event_type == ControlEventType::GraphReplaced
120 && msg.graph_status == GraphStatus::Active
121 {
122 let matched = pending.iter().position(|x| x == &msg.graph_name);
123 if matched.is_some() {
124 pending.remove(matched.unwrap());
125 }
126 }
127 info!(
128 "Received Event: {:?} (Remaining={})",
129 serde_json::to_string(&msg),
130 pending.len(),
131 );
132 if pending.len() == 0 {
133 break;
134 }
135 }
136 None => {
137 panic!("No message received");
138 }
139 },
140 Err(_) => {
141 let pending_graphs = get_pending();
142
143 panic!("Timeout waiting for message: {}", pending_graphs.join(", "));
148 }
149 }
150 }
151 deregister::<ControlEvent>("startup");
152 info!("All systems a go!");
153
154 post_graph_hook();
155 info!("Post graph hook executed.");
156}
157
158pub fn post_graph_hook() {
159 let graph_registry = GraphRegistry::get_instance();
160
161 let graph_registry = graph_registry.lock().unwrap();
162 for graph in graph_registry.get_graphs() {
163 let graph = graph.lock().unwrap();
164
165 info!("Post start hook for graph: {}", graph.name);
166 graph.post_start_hook();
167 }
168}
169
170#[cfg(test)]
171mod saasexpress_core_tests {
172 use std::panic;
173 use std::thread::sleep;
174
175 use serde_json::json;
176 use tracing::{Level, debug, info, instrument};
177 use tracing_subscriber::layer::SubscriberExt;
178 use tracing_subscriber::util::SubscriberInitExt;
179
180 use crate::graph::graph::IntoGraphRunner;
181 use crate::graph::registry::GraphRegistry;
182 use crate::my_reg::broadcast_event;
183 use crate::operators::global_space::resource::WidgetsSharedService;
184 use crate::{graph::message::Message, settings::settings::env_settings};
188
189 use super::*;
190
191 use std::sync::Once;
192
193 static INIT: Once = Once::new();
194
195 pub fn initialize() {
196 INIT.call_once(|| {
197 tracing_subscriber::registry()
200 .with(tracing_subscriber::EnvFilter::new(
202 std::env::var("RUST_LOG").unwrap_or_else(|_| {
203 "saasexpress_tenants=warn,saasexpress_core=debug,saasexpress=debug,tower_http=info,tokio=trace,runtime=trace".into()
204 }),
205 ))
206 .with(tracing_subscriber::fmt::layer().with_thread_ids(true))
207 .init();
208
209 });
210
211 GraphRegistry::get_instance().lock().unwrap().clear();
212 }
213
214 fn setup() {
215 initialize();
216 }
217 fn teardown() {
218 let graph_registry = GraphRegistry::get_instance();
219 let mut graph_registry = graph_registry.lock().unwrap();
220 graph_registry.clear();
221 }
222
223 fn run_test<T>(test: T) -> ()
224 where
225 T: FnOnce() -> () + panic::UnwindSafe,
226 {
227 setup();
228 let result = panic::catch_unwind(|| test());
229 teardown();
230 assert!(result.is_ok())
231 }
232
233 #[test]
234 fn it_works() {
235 let result = add(2, 2);
236 assert_eq!(result, 4);
237 }
238
239 #[test]
240 fn env_settings_works() {
241 let settings = env_settings("TEST".to_string());
242 assert_eq!(settings.len(), 0);
243 }
244
245 #[tokio::test]
246 async fn buffertojson_works() {
247 initialize();
248
249 const GRAPH: &str = r#"
250 name: buffer_to_json
251 nodes:
252 - id: start
253 operator: BufferToJSON
254 "#;
255 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
256
257 start_graphs().await;
258
259 let runner = graph_name.into_graph_runner();
260
261 assert_eq!(runner.name, "buffer_to_json");
262
263 let response = runner.end_to_end("{}".as_bytes().to_vec()).await;
264
265 debug!("Message: {:?}", response);
266 let Message::JSON { message, .. } = response else {
267 panic!("Expected JSON message");
268 };
269
270 assert_eq!(message.get("_ts").is_some(), true);
271
272 }
274
275 #[tokio::test]
276 async fn claimcheck_works() {
277 initialize();
278
279 const GRAPH: &str = r#"
280 name: claim_check
281 nodes:
282 - id: start
283 operator: ClaimCheck
284 "#;
285 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
286
287 start_graphs().await;
288
289 let graph = graph_name.into_graph_runner();
290
291 assert_eq!(graph.name, "claim_check");
292
293 let response = graph.end_to_end_standard("{}".as_bytes().to_vec()).await;
294
295 debug!("Message: {:?}", response);
296 let Message::JSON { message, .. } = response else {
297 panic!("Expected JSON message");
298 };
299
300 assert_eq!(
301 message
302 .get("claim_type")
303 .unwrap_or(&serde_json::Value::String("".to_string())),
304 "filesystem"
305 );
306
307 }
309
310 #[tokio::test]
311 async fn shell_works() {
312 initialize();
313
314 const GRAPH: &str = r#"
315 name: shell
316 nodes:
317 - id: start
318 operator: BufferToJSON
319 - id: shell
320 operator: Shell
321 config:
322 command: bash
323 args:
324 - pwd
325
326 edges:
327 - from: start
328 to: shell
329 "#;
330 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
331
332 start_graphs().await;
333
334 let graph = graph_name.into_graph_runner();
335
336 assert_eq!(graph.name, "shell");
337
338 let response = graph.end_to_end_2("{}".as_bytes().to_vec()).await;
339
340 let Message::JSON { message, .. } = response else {
341 panic!("Expected Standard message");
342 };
343
344 info!(
345 "Message: {}",
346 serde_json::to_string_pretty(&message).unwrap()
347 );
348 assert_eq!(message.as_array().unwrap().len(), 2);
349
350 }
352
353 #[tokio::test]
354 async fn test_fan_out() {
355 initialize();
356
357 const GRAPH: &str = r#"
358 name: fan_out
359 nodes:
360 - id: fanout
361 operator: FanOut
362 - id: fanout_1
363 operator: Passthrough
364 - id: fanout_2
365 operator: Passthrough
366 edges:
367 - from: fanout
368 to: fanout_1
369 - from: fanout
370 to: fanout_2
371 "#;
372
373 info!("Graph: {}", GRAPH);
374 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
375
376 start_graphs().await;
377
378 let graph = graph_name.into_graph_runner();
379
380 assert_eq!(graph.name, "fan_out");
381
382 let response = graph.end_to_end_json(json!({"name":"joe"})).await;
383
384 let Message::JSON { message, .. } = response else {
385 panic!("Expected Standard message");
386 };
387
388 assert_eq!(message.as_array().unwrap().len(), 2);
389 assert_eq!(message[0].get("name").unwrap(), "joe");
390 assert_eq!(message[1].get("name").unwrap(), "joe");
391
392 }
394
395 #[tokio::test]
396 async fn test_callout() {
397 initialize();
398
399 const GRAPH: &str = r#"
400 name: g_callout
401 nodes:
402 - id: n_callout
403 operator: Callout
404 config:
405 graph_name: worker
406 edges: []
407 "#;
408
409 const GRAPH_WORKER: &str = r#"
410 name: worker
411 nodes:
412 - id: start
413 operator: Stub
414 config:
415 name: Joe
416 edges: []
417 "#;
418
419 info!("Graph: {}", GRAPH);
420 let _ = build_graph(serde_yaml::from_str(GRAPH_WORKER).unwrap());
421 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
422
423 start_graphs().await;
424
425 let graph = graph_name.into_graph_runner();
426
427 assert_eq!(graph.name, "g_callout");
437
438 let response = graph.end_to_end_standard("hello".as_bytes().to_vec()).await;
439
440 info!("Response : {:?}", response);
441
442 let Message::Tuple {
443 message_1,
444 message_2,
445 ..
446 } = response
447 else {
448 panic!("Expected Standard message");
449 };
450
451 let Message::Standard { message, .. } = message_1.as_ref() else {
452 panic!("Expected Standard message");
453 };
454
455 assert_eq!(message.to_vec(), "hello".as_bytes().to_vec());
456
457 let Message::JSON { message, .. } = message_2.as_ref() else {
458 panic!("Expected JSON message: {:?}", message_2);
459 };
460
461 let nm = message.get("name").unwrap();
462 assert_eq!(nm, "Joe");
463
464 }
466
467 #[tokio::test]
468 async fn test_shared_resources() {
469 initialize();
470
471 const GRAPH_1: &str = r#"
472 name: graph_1
473 nodes:
474 - id: global
475 operator: GlobalSpace
476 edges: []
477 "#;
478
479 const GRAPH_WORKER: &str = r#"
480 name: worker
481 nodes:
482 - id: global
483 operator: GlobalSpace
484 - id: start
485 operator: Stub
486 config:
487 name: Joe
488 edges:
489 - from: global
490 to: start
491 "#;
492
493 let _ = build_graph(serde_yaml::from_str(GRAPH_1).unwrap());
494 let graph_name = build_graph(serde_yaml::from_str(GRAPH_WORKER).unwrap());
495
496 start_graphs().await;
497
498 async fn eval(graph_name: String) {
499 let graph = graph_name.clone().into_graph_runner();
500
501 assert_eq!(graph.name, "worker");
502
503 let response = graph.end_to_end_standard("hello".as_bytes().to_vec()).await;
504
505 info!("Response : {:?}", response);
506
507 let Message::JSON { message, .. } = response else {
508 panic!("Expected Message to be JSON: {:?}", response);
509 };
510
511 let nm = message.get("name").unwrap();
512 assert_eq!(nm, "Joe");
513 }
514
515 {
516 let graph_registry = GraphRegistry::get_instance();
517 let graph_registry = graph_registry.lock().unwrap();
518 let graph = graph_registry.get_graph_by_name(&graph_name).unwrap();
519
520 let graph = graph.lock().unwrap();
521 let ls = graph.shared_resources();
522 ls.iter().for_each(|share| {
523 let share = share.lock().unwrap();
524 info!("Shared Resource: {:?}", share.purpose());
525
526 share.start();
527 share.stop();
528 });
529 }
530
531 WidgetsSharedService::drop_instance();
532
533 eval(graph_name.clone()).await;
536 }
537
538 #[tokio::test]
539 async fn test_graph_upgrade() {
540 initialize();
541
542 const GRAPH: &str = r#"
543 name: g_callout
544 nodes:
545 - id: n_callout
546 operator: Callout
547 config:
548 graph_name: worker
549 edges: []
550 "#;
551
552 const GRAPH_WORKER: &str = r#"
553 name: worker
554 nodes:
555 - id: start
556 operator: Stub
557 config:
558 name: Joe
559 - id: global
560 operator: GlobalSpace
561 edges: []
562 "#;
563
564 info!("Graph: {}", GRAPH);
565 let _ = build_graph(serde_yaml::from_str(GRAPH_WORKER).unwrap());
566 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
567
568 start_graphs().await;
569
570 async fn eval(graph_name: String) {
571 let graph = graph_name.clone().into_graph_runner();
572
573 assert_eq!(graph.name, "g_callout");
574
575 let response = graph.end_to_end_standard("hello".as_bytes().to_vec()).await;
576
577 info!("Response : {:?}", response);
578
579 let Message::Tuple {
580 message_1,
581 message_2,
582 ..
583 } = response
584 else {
585 panic!("Expected Tuple message");
586 };
587
588 let Message::Standard { message, .. } = message_1.as_ref() else {
589 panic!("Expected First Message to be Standard");
590 };
591
592 assert_eq!(message.to_vec(), "hello".as_bytes().to_vec());
593
594 let Message::JSON { message, .. } = message_2.as_ref() else {
595 panic!("Expected Second Message to be JSON: {:?}", message_2);
596 };
597
598 let nm = message.get("name").unwrap();
599 assert_eq!(nm, "Joe");
600 debug!("Test passed for graph: {}", graph_name);
601 }
602
603 eval(graph_name.clone()).await;
604
605 Graph::deregister(graph_name);
606
607 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
608
609 start_graphs().await;
611
612 eval(graph_name.clone()).await;
617
618 teardown();
619 }
620
621 #[tokio::test]
622 async fn test_settings() {
623 initialize();
624
625 const GRAPH: &str = r#"
626 name: settings
627 nodes:
628 - id: settings
629 operator: Settings
630 edges: []
631 "#;
632
633 info!("Graph: {}", GRAPH);
634
635 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
636
637 start_graphs().await;
638
639 let graph = graph_name.into_graph_runner();
640
641 assert_eq!(graph.name, "settings");
642
643 let response = graph.end_to_end_json(json!({"name": "Joe"})).await;
644
645 info!("Response : {:?}", response);
646
647 let Message::JSON { message, .. } = response else {
648 panic!("Expected Standard message");
649 };
650
651 assert_eq!(message.get("name").unwrap(), "Joe");
652
653 }
655
656 #[tokio::test]
657 async fn test_canodamo_sample_ok() {
658 initialize();
659
660 const GRAPH: &str = r#"
661 name: canonical_model
662 nodes:
663 - id: start
664 operator: CanonicalModelSample
665 edges: []
666 "#;
667
668 info!("Graph: {}", GRAPH);
669
670 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
671
672 start_graphs().await;
673
674 let graph = graph_name.into_graph_runner();
675
676 assert_eq!(graph.name, "canonical_model");
677
678 let response = graph.end_to_end_json(json!({"name": "Joe"})).await;
679
680 info!("Response : {:?}", response);
681
682 let Message::JSON { message, .. } = response else {
683 panic!("Expected Standard message");
684 };
685
686 assert_eq!(message.get("name").unwrap(), "Joe");
687
688 }
690
691 #[tokio::test]
692 async fn test_canodamo_sample_error() {
693 initialize();
694
695 const GRAPH: &str = r#"
696 name: canonical_model_err
697 nodes:
698 - id: start
699 operator: CanonicalModelSample
700 edges: []
701 "#;
702
703 info!("Graph: {}", GRAPH);
704
705 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
706
707 start_graphs().await;
708
709 let graph = graph_name.into_graph_runner();
710
711 assert_eq!(graph.name, "canonical_model_err");
712
713 let response = graph.end_to_end_json(json!({"first": "Joe"})).await;
714
715 info!("Response : {:?}", response);
716
717 let Message::Error { error, .. } = response else {
718 panic!("Expected Error message");
719 };
720
721 assert_eq!(
722 error,
723 "Canonical Model Validation Error - missing field `name`"
724 );
725
726 }
728
729 #[tokio::test]
730 async fn test_ai_tool() {
731 initialize();
732
733 const GRAPH_TOOL: &str = r#"
734 name: ai_tool
735 nodes:
736 - id: start
737 operator: AITool
738 config:
739 name: Joe
740 schema:
741 type: object
742 properties:
743 name:
744 type: string
745 edges: []
746 "#;
747
748 let graph_name = build_graph(serde_yaml::from_str(GRAPH_TOOL).unwrap());
749
750 start_graphs().await;
751
752 let graph = graph_name.into_graph_runner();
753
754 assert_eq!(graph.name, "ai_tool");
755
756 let response = graph.end_to_end_json(json!({"first": "Joe"})).await;
757
758 info!("Response : {:?}", response);
759
760 let Message::JSON { message, .. } = response else {
761 panic!("Expected Error message");
762 };
763
764 assert_eq!(
765 serde_json::to_string(&message).unwrap(),
766 "{\"first\":\"Joe\"}"
767 );
768
769 }
771
772 #[instrument]
773 #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
774 async fn test_ai_agent() {
775 initialize();
776
777 const GRAPH: &str = r#"
778 name: ai_agent
779 nodes:
780 - id: start
781 operator: AIAgent
782 - id: tool_a
783 operator: Callout
784 config:
785 graph_name: ai_tool
786
787 - id: system_prompt
788 operator: Stub
789 config:
790 content: |
791 You are a shopping assistant. Use these functions:
792
793 1. search_products: When user wants to find products (e.g., 'show me shirts')
794 2. get_product_details: When user asks about a specific product ID (e.g., 'tell me about product p1')
795 3. clarify_request: When user's request is unclear
796
797
798 - id: chatgpt_llm
799 operator: Stub
800 config:
801 choices:
802 - index: 0
803 message:
804 role: assistant
805 annotations: []
806 content: |
807 {"something": {"returned": true}}
808 finish_reason: stop
809 created: 0
810 id: chatgpt-123
811 model: gpt-3.5-turbo
812 object: chat.completion
813 service_tier: free
814 system_fingerprint: "fingerprint-123"
815 usage:
816 completion_tokens: 0
817 completion_tokens_details:
818 accepted_prediction_tokens: 0
819 audio_tokens: 0
820 reasoning_tokens: 0
821 rejected_prediction_tokens: 0
822 prompt_tokens: 0
823 prompt_tokens_details:
824 audio_tokens: 0
825 cached_tokens: 0
826 total_tokens: 0
827
828 edges:
829 - from: start
830 to: tool_a
831 role: tool
832 - from: start
833 to: system_prompt
834 role: prompt
835 - from: start
836 to: chatgpt_llm
837 role: llm
838 "#;
839
840 const GRAPH_TOOL: &str = r#"
841 name: ai_tool
842 nodes:
843 - id: start
844 operator: AITool
845 config:
846 name: Joe
847 schema:
848 type: object
849 properties:
850 name:
851 type: string
852 edges: []
853 "#;
854
855 info!("Graph: {}", GRAPH);
856
857 let graph_name = build_graph(serde_yaml::from_str(GRAPH).unwrap());
858
859 let _ = build_graph(serde_yaml::from_str(GRAPH_TOOL).unwrap());
860
861 start_graphs().await;
862
863 let graph = graph_name.into_graph_runner();
864
865 let response = graph
866 .end_to_end_json(json!({"prompt": "Do something"}))
867 .await;
868
869 let Message::JSON { message, .. } = response else {
870 panic!("Expected JSON message - {:?}", response);
871 };
872
873 assert_eq!(
874 serde_json::to_string(&message).unwrap(),
875 "{\"response\":\"{\\\"something\\\": {\\\"returned\\\": true}}\\n\"}"
876 );
877 info!("Great, response as expected!");
878 }
880
881 #[tokio::test]
882 async fn test_reg_example() {
883 initialize();
884
885 let (tx, mut rx) = mpsc::channel::<ControlEvent>(100);
886
887 register("my_channel", tx);
888
889 broadcast_event(ControlEvent {
890 graph_id: "ai_agent".to_string(),
891 graph_name: "ai_agent".to_string(),
892 graph_status: GraphStatus::Active,
893 event_type: ControlEventType::Notice,
894 reason: "Test event".to_string(),
895 operator_names: vec![],
896 })
897 .await;
898
899 let returned = rx.recv().await;
900 assert!(returned.is_some(), "Expected a message from the channel");
901 let msg = returned.unwrap();
902 assert_eq!(msg.graph_name, "ai_agent");
903 assert_eq!(msg.event_type, ControlEventType::Notice);
904 assert_eq!(msg.reason, "Test event");
905 }
906}