api/
api.rs

1use std::{thread::spawn, time::SystemTime};
2
3use env_logger::{Builder, Env};
4use log::LevelFilter;
5use tether_agent::TetherAgentBuilder;
6use tether_utils::{
7    tether_playback::{PlaybackOptions, TetherPlaybackUtil},
8    tether_receive::{ReceiveOptions, receive},
9    tether_record::{RecordOptions, TetherRecordUtil},
10    tether_send::{SendOptions, send},
11    tether_topics::{TopicOptions, insights::Insights},
12};
13
14fn demo_receive() {
15    let mut tether_agent = TetherAgentBuilder::new("demoReceive")
16        .build()
17        .expect("failed to init/connect Tether Agent");
18
19    let options = ReceiveOptions::default();
20
21    receive(
22        &options,
23        &mut tether_agent,
24        |_channel_name, topic, decoded| {
25            let contents = decoded.unwrap_or("(empty/invalid message)".into());
26            println!("RECEIVE: \"{}\" :: {}", topic, contents);
27        },
28    )
29}
30
31fn demo_send() {
32    let mut tether_agent = TetherAgentBuilder::new("demoSend")
33        .build()
34        .expect("failed to init/connect Tether Agent");
35
36    let mut count = 0;
37
38    let options = SendOptions {
39        channel_name: Some("dummyData".into()),
40        channel_topic: None,
41        channel_id: None,
42        channel_role: None,
43        message_payload_json: None,
44        use_dummy_data: true,
45    };
46
47    loop {
48        std::thread::sleep(std::time::Duration::from_secs(1));
49        count += 1;
50        println!("SEND: sending message #{}", count);
51        send(&options, &mut tether_agent).expect("failed to send");
52    }
53}
54
55fn demo_topics() {
56    let mut tether_agent = TetherAgentBuilder::new("demoTopics")
57        .build()
58        .expect("failed to init/connect Tether Agent");
59
60    let options = TopicOptions {
61        topic: "#".into(),
62        sampler_interval: 1000,
63        graph_enable: false,
64    };
65
66    let mut insights = Insights::new(&options, &mut tether_agent);
67
68    loop {
69        while let Some((topic, payload)) = tether_agent.check_messages() {
70            if insights.update(&topic, payload) {
71                println!("TOPICS: Insights update: \n{}", insights);
72            }
73        }
74    }
75}
76
77fn demo_playback() {
78    let options = PlaybackOptions {
79        file_path: "./demo.json".into(),
80        override_topic: None,
81        loop_count: 1, // ignored anyway, in this case
82        loop_infinite: true,
83        ignore_ctrl_c: true, // this is important for programmatic use
84        playback_speed: 1.0,
85        topic_filters: None,
86    };
87
88    let tether_agent = TetherAgentBuilder::new("demoTopics")
89        .build()
90        .expect("failed to init/connect Tether Agent");
91
92    let player = TetherPlaybackUtil::new(options);
93    let stop_request_tx = player.get_stop_tx();
94
95    let start_time = SystemTime::now();
96
97    let handles = vec![
98        spawn(move || {
99            player.start(&tether_agent);
100        }),
101        spawn(move || {
102            let mut time_to_end = false;
103            while !time_to_end {
104                if let Ok(elapsed) = start_time.elapsed() {
105                    if elapsed > std::time::Duration::from_secs(3) {
106                        println!("Time to stop! {}s elapsed", elapsed.as_secs());
107                        stop_request_tx
108                            .send(true)
109                            .expect("failed to send stop request via channel");
110                        time_to_end = true;
111                    }
112                }
113            }
114            println!("Playback should have stopped now; wait 1 more seconds...");
115            std::thread::sleep(std::time::Duration::from_secs(1));
116
117            println!("...Bye");
118        }),
119    ];
120    for handle in handles {
121        handle.join().expect("failed to join handle");
122    }
123}
124
125fn demo_record() {
126    let mut tether_agent = TetherAgentBuilder::new("demoPlayback")
127        .build()
128        .expect("failed to init/connect Tether Agent");
129
130    let options = RecordOptions {
131        file_override_path: None,
132        file_base_path: "./".into(),
133        file_base_name: "recording".into(),
134        file_no_timestamp: false,
135        topic: "#".into(),
136        timing_nonzero_start: false,
137        timing_delay: None,
138        timing_duration: None,
139        ignore_ctrl_c: true, // this is important for programmatic use
140    };
141
142    let recorder = TetherRecordUtil::new(options);
143    let stop_request_tx = recorder.get_stop_tx();
144
145    let start_time = SystemTime::now();
146    let handles = vec![
147        spawn(move || {
148            let mut time_to_end = false;
149            while !time_to_end {
150                if let Ok(elapsed) = start_time.elapsed() {
151                    if elapsed > std::time::Duration::from_secs(3) {
152                        println!("Time to stop! {}s elapsed", elapsed.as_secs());
153                        stop_request_tx
154                            .send(true)
155                            .expect("failed to send stop request via channel");
156                        time_to_end = true;
157                    }
158                }
159            }
160            println!("Recording should have stopped now; wait 4 more seconds...");
161            std::thread::sleep(std::time::Duration::from_secs(4));
162            println!("...Bye");
163        }),
164        spawn(move || {
165            recorder.start_recording(&mut tether_agent);
166        }),
167    ];
168
169    for handle in handles {
170        handle.join().expect("RECORDER: failed to join handle");
171    }
172}
173
174fn main() {
175    println!(
176        "This example shows how the tether-utils library can be used programmatically,
177    i.e. not from the CLI"
178    );
179    println!("Press Ctrl+C to stop");
180
181    let mut env_builder = Builder::from_env(Env::default().default_filter_or("info"));
182    env_builder.filter_module("paho_mqtt", LevelFilter::Warn);
183    env_builder.init();
184
185    let handles = vec![
186        spawn(demo_receive),
187        spawn(demo_send),
188        spawn(demo_topics),
189        spawn(|| {
190            std::thread::sleep(std::time::Duration::from_secs(4));
191            demo_playback();
192        }),
193        spawn(demo_record),
194    ];
195
196    for handle in handles {
197        handle.join().expect("failed to join handle");
198    }
199}