1use std::{thread::spawn, time::SystemTime};
2
3use env_logger::{Builder, Env};
4use log::LevelFilter;
5use tether_agent::TetherAgentBuilder;
6use tether_utils::{
7 tether_playback::{PlaybackOptions, TetherPlaybackUtil},
8 tether_receive::{ReceiveOptions, receive},
9 tether_record::{RecordOptions, TetherRecordUtil},
10 tether_send::{SendOptions, send},
11 tether_topics::{TopicOptions, insights::Insights},
12};
13
14fn demo_receive() {
15 let mut tether_agent = TetherAgentBuilder::new("demoReceive")
16 .build()
17 .expect("failed to init/connect Tether Agent");
18
19 let options = ReceiveOptions::default();
20
21 receive(
22 &options,
23 &mut tether_agent,
24 |_channel_name, topic, decoded| {
25 let contents = decoded.unwrap_or("(empty/invalid message)".into());
26 println!("RECEIVE: \"{}\" :: {}", topic, contents);
27 },
28 )
29}
30
31fn demo_send() {
32 let mut tether_agent = TetherAgentBuilder::new("demoSend")
33 .build()
34 .expect("failed to init/connect Tether Agent");
35
36 let mut count = 0;
37
38 let options = SendOptions {
39 channel_name: Some("dummyData".into()),
40 channel_topic: None,
41 channel_id: None,
42 channel_role: None,
43 message_payload_json: None,
44 use_dummy_data: true,
45 };
46
47 loop {
48 std::thread::sleep(std::time::Duration::from_secs(1));
49 count += 1;
50 println!("SEND: sending message #{}", count);
51 send(&options, &mut tether_agent).expect("failed to send");
52 }
53}
54
55fn demo_topics() {
56 let mut tether_agent = TetherAgentBuilder::new("demoTopics")
57 .build()
58 .expect("failed to init/connect Tether Agent");
59
60 let options = TopicOptions {
61 topic: "#".into(),
62 sampler_interval: 1000,
63 graph_enable: false,
64 };
65
66 let mut insights = Insights::new(&options, &mut tether_agent);
67
68 loop {
69 while let Some((topic, payload)) = tether_agent.check_messages() {
70 if insights.update(&topic, payload) {
71 println!("TOPICS: Insights update: \n{}", insights);
72 }
73 }
74 }
75}
76
77fn demo_playback() {
78 let options = PlaybackOptions {
79 file_path: "./demo.json".into(),
80 override_topic: None,
81 loop_count: 1, loop_infinite: true,
83 ignore_ctrl_c: true, playback_speed: 1.0,
85 topic_filters: None,
86 };
87
88 let tether_agent = TetherAgentBuilder::new("demoTopics")
89 .build()
90 .expect("failed to init/connect Tether Agent");
91
92 let player = TetherPlaybackUtil::new(options);
93 let stop_request_tx = player.get_stop_tx();
94
95 let start_time = SystemTime::now();
96
97 let handles = vec![
98 spawn(move || {
99 player.start(&tether_agent);
100 }),
101 spawn(move || {
102 let mut time_to_end = false;
103 while !time_to_end {
104 if let Ok(elapsed) = start_time.elapsed() {
105 if elapsed > std::time::Duration::from_secs(3) {
106 println!("Time to stop! {}s elapsed", elapsed.as_secs());
107 stop_request_tx
108 .send(true)
109 .expect("failed to send stop request via channel");
110 time_to_end = true;
111 }
112 }
113 }
114 println!("Playback should have stopped now; wait 1 more seconds...");
115 std::thread::sleep(std::time::Duration::from_secs(1));
116
117 println!("...Bye");
118 }),
119 ];
120 for handle in handles {
121 handle.join().expect("failed to join handle");
122 }
123}
124
125fn demo_record() {
126 let mut tether_agent = TetherAgentBuilder::new("demoPlayback")
127 .build()
128 .expect("failed to init/connect Tether Agent");
129
130 let options = RecordOptions {
131 file_override_path: None,
132 file_base_path: "./".into(),
133 file_base_name: "recording".into(),
134 file_no_timestamp: false,
135 topic: "#".into(),
136 timing_nonzero_start: false,
137 timing_delay: None,
138 timing_duration: None,
139 ignore_ctrl_c: true, };
141
142 let recorder = TetherRecordUtil::new(options);
143 let stop_request_tx = recorder.get_stop_tx();
144
145 let start_time = SystemTime::now();
146 let handles = vec![
147 spawn(move || {
148 let mut time_to_end = false;
149 while !time_to_end {
150 if let Ok(elapsed) = start_time.elapsed() {
151 if elapsed > std::time::Duration::from_secs(3) {
152 println!("Time to stop! {}s elapsed", elapsed.as_secs());
153 stop_request_tx
154 .send(true)
155 .expect("failed to send stop request via channel");
156 time_to_end = true;
157 }
158 }
159 }
160 println!("Recording should have stopped now; wait 4 more seconds...");
161 std::thread::sleep(std::time::Duration::from_secs(4));
162 println!("...Bye");
163 }),
164 spawn(move || {
165 recorder.start_recording(&mut tether_agent);
166 }),
167 ];
168
169 for handle in handles {
170 handle.join().expect("RECORDER: failed to join handle");
171 }
172}
173
174fn main() {
175 println!(
176 "This example shows how the tether-utils library can be used programmatically,
177 i.e. not from the CLI"
178 );
179 println!("Press Ctrl+C to stop");
180
181 let mut env_builder = Builder::from_env(Env::default().default_filter_or("info"));
182 env_builder.filter_module("paho_mqtt", LevelFilter::Warn);
183 env_builder.init();
184
185 let handles = vec![
186 spawn(demo_receive),
187 spawn(demo_send),
188 spawn(demo_topics),
189 spawn(|| {
190 std::thread::sleep(std::time::Duration::from_secs(4));
191 demo_playback();
192 }),
193 spawn(demo_record),
194 ];
195
196 for handle in handles {
197 handle.join().expect("failed to join handle");
198 }
199}