api/
api.rs

1use std::{thread::spawn, time::SystemTime};
2
3use env_logger::{Builder, Env};
4use log::LevelFilter;
5use tether_agent::TetherAgentOptionsBuilder;
6use tether_utils::{
7    tether_playback::{PlaybackOptions, TetherPlaybackUtil},
8    tether_receive::{receive, ReceiveOptions},
9    tether_record::{RecordOptions, TetherRecordUtil},
10    tether_send::{send, SendOptions},
11    tether_topics::{insights::Insights, TopicOptions},
12};
13
14fn demo_receive() {
15    let mut tether_agent = TetherAgentOptionsBuilder::new("demoReceive")
16        .build()
17        .expect("failed to init/connect Tether Agent");
18
19    let options = ReceiveOptions::default();
20
21    receive(&options, &mut tether_agent, |_plug_name, topic, decoded| {
22        let contents = decoded.unwrap_or("(empty/invalid message)".into());
23        println!("RECEIVE: \"{}\" :: {}", topic, contents);
24    })
25}
26
27fn demo_send() {
28    let mut tether_agent = TetherAgentOptionsBuilder::new("demoSend")
29        .build()
30        .expect("failed to init/connect Tether Agent");
31
32    let mut count = 0;
33
34    let options = SendOptions {
35        plug_name: Some("dummyData".into()),
36        plug_topic: None,
37        plug_id: None,
38        plug_role: None,
39        message_payload_json: None,
40        use_dummy_data: true,
41    };
42
43    loop {
44        std::thread::sleep(std::time::Duration::from_secs(1));
45        count += 1;
46        println!("SEND: sending message #{}", count);
47        send(&options, &mut tether_agent).expect("failed to send");
48    }
49}
50
51fn demo_topics() {
52    let mut tether_agent = TetherAgentOptionsBuilder::new("demoTopics")
53        .build()
54        .expect("failed to init/connect Tether Agent");
55
56    let options = TopicOptions {
57        topic: "#".into(),
58        sampler_interval: 1000,
59        graph_enable: false,
60    };
61
62    let mut insights = Insights::new(&options, &mut tether_agent);
63
64    loop {
65        while let Some((topic, payload)) = tether_agent.check_messages() {
66            if insights.update(&topic, payload) {
67                println!("TOPICS: Insights update: \n{}", insights);
68            }
69        }
70    }
71}
72
73fn demo_playback() {
74    let options = PlaybackOptions {
75        file_path: "./demo.json".into(),
76        override_topic: None,
77        loop_count: 1, // ignored anyway, in this case
78        loop_infinite: true,
79        ignore_ctrl_c: true, // this is important for programmatic use
80        playback_speed: 1.0,
81        topic_filters: None,
82    };
83
84    let tether_agent = TetherAgentOptionsBuilder::new("demoTopics")
85        .build()
86        .expect("failed to init/connect Tether Agent");
87
88    let player = TetherPlaybackUtil::new(options);
89    let stop_request_tx = player.get_stop_tx();
90
91    let start_time = SystemTime::now();
92
93    let handles = vec![
94        spawn(move || {
95            player.start(&tether_agent);
96        }),
97        spawn(move || {
98            let mut time_to_end = false;
99            while !time_to_end {
100                if let Ok(elapsed) = start_time.elapsed() {
101                    if elapsed > std::time::Duration::from_secs(3) {
102                        println!("Time to stop! {}s elapsed", elapsed.as_secs());
103                        stop_request_tx
104                            .send(true)
105                            .expect("failed to send stop request via channel");
106                        time_to_end = true;
107                    }
108                }
109            }
110            println!("Playback should have stopped now; wait 1 more seconds...");
111            std::thread::sleep(std::time::Duration::from_secs(1));
112
113            println!("...Bye");
114        }),
115    ];
116    for handle in handles {
117        handle.join().expect("failed to join handle");
118    }
119}
120
121fn demo_record() {
122    let mut tether_agent = TetherAgentOptionsBuilder::new("demoPlayback")
123        .build()
124        .expect("failed to init/connect Tether Agent");
125
126    let options = RecordOptions {
127        file_override_path: None,
128        file_base_path: "./".into(),
129        file_base_name: "recording".into(),
130        file_no_timestamp: false,
131        topic: "#".into(),
132        timing_nonzero_start: false,
133        timing_delay: None,
134        timing_duration: None,
135        ignore_ctrl_c: true, // this is important for programmatic use
136    };
137
138    let recorder = TetherRecordUtil::new(options);
139    let stop_request_tx = recorder.get_stop_tx();
140
141    let start_time = SystemTime::now();
142    let handles = vec![
143        spawn(move || {
144            let mut time_to_end = false;
145            while !time_to_end {
146                if let Ok(elapsed) = start_time.elapsed() {
147                    if elapsed > std::time::Duration::from_secs(3) {
148                        println!("Time to stop! {}s elapsed", elapsed.as_secs());
149                        stop_request_tx
150                            .send(true)
151                            .expect("failed to send stop request via channel");
152                        time_to_end = true;
153                    }
154                }
155            }
156            println!("Recording should have stopped now; wait 4 more seconds...");
157            std::thread::sleep(std::time::Duration::from_secs(4));
158            println!("...Bye");
159        }),
160        spawn(move || {
161            recorder.start_recording(&mut tether_agent);
162        }),
163    ];
164
165    for handle in handles {
166        handle.join().expect("RECORDER: failed to join handle");
167    }
168}
169
170fn main() {
171    println!(
172        "This example shows how the tether-utils library can be used programmatically,
173    i.e. not from the CLI"
174    );
175    println!("Press Ctrl+C to stop");
176
177    let mut env_builder = Builder::from_env(Env::default().default_filter_or("info"));
178    env_builder.filter_module("paho_mqtt", LevelFilter::Warn);
179    env_builder.init();
180
181    let handles = vec![
182        spawn(demo_receive),
183        spawn(demo_send),
184        spawn(demo_topics),
185        spawn(|| {
186            std::thread::sleep(std::time::Duration::from_secs(4));
187            demo_playback();
188        }),
189        spawn(demo_record),
190    ];
191
192    for handle in handles {
193        handle.join().expect("failed to join handle");
194    }
195}