1pub mod admin;
3pub mod broker;
4pub mod influx;
5pub mod mqtt;
6pub mod ota;
7pub mod settings;
8pub mod telemetry;
9
10pub use pyrinas_shared::*;
11
12use influxdb::{ReadQuery, WriteQuery};
14
15use flume::{Receiver, Sender};
17use pyrinas_shared::ota::{v2::OtaUpdate, OtaUpdateVersioned, OtaVersion};
18use std::{io, sync::Arc};
19
20use tokio::task;
22
23use librumqttd::async_locallink::construct_broker;
25
26use thiserror::Error;
28
29#[derive(Debug, Error)]
30pub enum Error {
31 #[error("{source}")]
32 SendError {
33 #[from]
34 source: flume::SendError<Event>,
35 },
36
37 #[error("{source}")]
43 MqttError {
44 #[from]
45 source: librumqttd::Error,
46 },
47
48 #[error("{source}")]
49 MqttLinkError {
50 #[from]
51 source: librumqttd::async_locallink::LinkError,
52 },
53
54 #[error("{source}")]
55 CborError {
56 #[from]
57 source: serde_cbor::Error,
58 },
59
60 #[error("{source}")]
61 SledError {
62 #[from]
63 source: sled::Error,
64 },
65
66 #[error("{source}")]
67 StringConversionError {
68 #[from]
69 source: std::string::FromUtf8Error,
70 },
71
72 #[error("{source}")]
73 IoError {
74 #[from]
75 source: io::Error,
76 },
77
78 #[error("{source}")]
79 TokioTaskError {
80 #[from]
81 source: tokio::task::JoinError,
82 },
83
84 #[error("err: {0}")]
85 CustomError(String),
86}
87
88pub async fn run(
90 settings: Arc<settings::PyrinasSettings>,
91 broker_sender: Sender<Event>,
92 broker_reciever: Receiver<Event>,
93) -> Result<(), Error> {
94 let task_sender = broker_sender.clone();
96 let task_settings = settings.clone();
97
98 if settings.clone().influx.is_some() {
100 task::spawn(async move {
101 influx::run(&task_settings.influx.to_owned().unwrap(), task_sender).await;
102 });
103 }
104
105 let task_sender = broker_sender.clone();
107 let task_settings = settings.clone();
108 task::spawn(async move {
109 ota::run(&task_settings.ota, task_sender).await;
110 });
111
112 let task_settings = settings.clone();
113 task::spawn(async move {
114 ota::ota_http_run(&task_settings.ota).await;
115 });
116
117 let task_sender = broker_sender.clone();
119 let task_settings = settings.clone();
120
121 if task_settings.admin.is_some() {
123 task::spawn(async move {
124 if let Err(e) = admin::run(&task_settings.admin.to_owned().unwrap(), task_sender).await
125 {
126 log::error!("Admin runtime error! Err: {}", e);
127 };
128 });
129 }
130
131 let (mut router, _, rumqtt_server, builder) = construct_broker(settings.mqtt.rumqtt.clone());
133
134 task::spawn(async {
136 rumqtt_server.await;
137 });
138
139 task::spawn_blocking(move || {
141 if let Err(e) = router.start() {
142 log::error!("mqtt router error. err: {}", e);
143 }
144 });
145
146 let (mut tx, mut rx) = builder.connect("localclient", 200).await?;
148
149 tx.subscribe(settings.mqtt.topics.clone()).await?;
151
152 let task_sender = broker_sender.clone();
154
155 task::spawn(async move {
157 mqtt::mqtt_run(&mut rx, task_sender).await;
158 });
159
160 let task_sender = broker_sender.clone();
162 task::spawn(async move {
163 mqtt::run(&mut tx, task_sender).await;
164 });
165
166 task::spawn(broker::run(broker_reciever)).await?;
170
171 Ok(())
172}
173
174#[derive(Debug, Clone)]
175pub enum Event {
176 NewRunner {
177 name: String,
178 sender: Sender<Event>,
179 },
180 OtaDeletePackage(String),
181 OtaNewPackage(OtaUpdate),
182 OtaUnlink {
183 device_id: Option<String>,
184 group_id: Option<String>,
185 },
186 OtaLink {
187 device_id: Option<String>,
188 group_id: Option<String>,
189 image_id: Option<String>,
190 ota_version: OtaVersion,
191 }, OtaRequest {
193 device_id: String,
194 msg: OtaRequest,
195 },
196 OtaResponse(OtaUpdateVersioned),
197 OtaUpdateImageListRequest(), OtaUpdateImageListRequestResponse(OtaImageListResponse), OtaUpdateGroupListRequest(), OtaUpdateGroupListRequestResponse(OtaGroupListResponse), ApplicationManagementRequest(ManagementData), ApplicationManagementResponse(ManagementData), ApplicationRequest(ApplicationData), ApplicationResponse(ApplicationData), InfluxDataSave(WriteQuery), InfluxDataRequest(ReadQuery), InfluxDataResponse, }