pyrinas_server/
lib.rs

1// Lib related
2pub mod admin;
3pub mod broker;
4pub mod influx;
5pub mod mqtt;
6pub mod ota;
7pub mod settings;
8pub mod telemetry;
9
10pub use pyrinas_shared::*;
11
12// Influx/Event
13use influxdb::{ReadQuery, WriteQuery};
14
15// Async Related
16use flume::{Receiver, Sender};
17use pyrinas_shared::ota::{v2::OtaUpdate, OtaUpdateVersioned, OtaVersion};
18use std::{io, sync::Arc};
19
20// Runtime
21use tokio::task;
22
23// MQTT related
24use librumqttd::async_locallink::construct_broker;
25
26// Error
27use thiserror::Error;
28
29#[derive(Debug, Error)]
30pub enum Error {
31    #[error("{source}")]
32    SendError {
33        #[from]
34        source: flume::SendError<Event>,
35    },
36
37    // #[error("{source}")]
38    // TokioError {
39    //     #[from]
40    //     source: tokio::Error,
41    // },
42    #[error("{source}")]
43    MqttError {
44        #[from]
45        source: librumqttd::Error,
46    },
47
48    #[error("{source}")]
49    MqttLinkError {
50        #[from]
51        source: librumqttd::async_locallink::LinkError,
52    },
53
54    #[error("{source}")]
55    CborError {
56        #[from]
57        source: serde_cbor::Error,
58    },
59
60    #[error("{source}")]
61    SledError {
62        #[from]
63        source: sled::Error,
64    },
65
66    #[error("{source}")]
67    StringConversionError {
68        #[from]
69        source: std::string::FromUtf8Error,
70    },
71
72    #[error("{source}")]
73    IoError {
74        #[from]
75        source: io::Error,
76    },
77
78    #[error("{source}")]
79    TokioTaskError {
80        #[from]
81        source: tokio::task::JoinError,
82    },
83
84    #[error("err: {0}")]
85    CustomError(String),
86}
87
88// TODO: conditional use of tokio OR async_std
89pub async fn run(
90    settings: Arc<settings::PyrinasSettings>,
91    broker_sender: Sender<Event>,
92    broker_reciever: Receiver<Event>,
93) -> Result<(), Error> {
94    // Clone these appropriately
95    let task_sender = broker_sender.clone();
96    let task_settings = settings.clone();
97
98    // Init influx connection
99    if settings.clone().influx.is_some() {
100        task::spawn(async move {
101            influx::run(&task_settings.influx.to_owned().unwrap(), task_sender).await;
102        });
103    }
104
105    // Ota task
106    let task_sender = broker_sender.clone();
107    let task_settings = settings.clone();
108    task::spawn(async move {
109        ota::run(&task_settings.ota, task_sender).await;
110    });
111
112    let task_settings = settings.clone();
113    task::spawn(async move {
114        ota::ota_http_run(&task_settings.ota).await;
115    });
116
117    // Clone these appropriately
118    let task_sender = broker_sender.clone();
119    let task_settings = settings.clone();
120
121    // Start unix socket task
122    if task_settings.admin.is_some() {
123        task::spawn(async move {
124            if let Err(e) = admin::run(&task_settings.admin.to_owned().unwrap(), task_sender).await
125            {
126                log::error!("Admin runtime error! Err: {}", e);
127            };
128        });
129    }
130
131    // Set up broker
132    let (mut router, _, rumqtt_server, builder) = construct_broker(settings.mqtt.rumqtt.clone());
133
134    // Running switch
135    task::spawn(async {
136        rumqtt_server.await;
137    });
138
139    // Spawn router task (needs to be done before anything else or else builder.connect blocks)
140    task::spawn_blocking(move || {
141        if let Err(e) = router.start() {
142            log::error!("mqtt router error. err: {}", e);
143        }
144    });
145
146    // Get the rx/tx channels
147    let (mut tx, mut rx) = builder.connect("localclient", 200).await?;
148
149    // Subscribe
150    tx.subscribe(settings.mqtt.topics.clone()).await?;
151
152    // Spawn a new task(s) for the MQTT stuff
153    let task_sender = broker_sender.clone();
154
155    // Start server task
156    task::spawn(async move {
157        mqtt::mqtt_run(&mut rx, task_sender).await;
158    });
159
160    // Start mqtt broker task
161    let task_sender = broker_sender.clone();
162    task::spawn(async move {
163        mqtt::run(&mut tx, task_sender).await;
164    });
165
166    // Spawn the broker task that handles it all!
167    // This blocks this async function from returning.
168    // If this returns, the server it shot anyway..
169    task::spawn(broker::run(broker_reciever)).await?;
170
171    Ok(())
172}
173
174#[derive(Debug, Clone)]
175pub enum Event {
176    NewRunner {
177        name: String,
178        sender: Sender<Event>,
179    },
180    OtaDeletePackage(String),
181    OtaNewPackage(OtaUpdate),
182    OtaUnlink {
183        device_id: Option<String>,
184        group_id: Option<String>,
185    },
186    OtaLink {
187        device_id: Option<String>,
188        group_id: Option<String>,
189        image_id: Option<String>,
190        ota_version: OtaVersion,
191    }, // Associate device with update
192    OtaRequest {
193        device_id: String,
194        msg: OtaRequest,
195    },
196    OtaResponse(OtaUpdateVersioned),
197    OtaUpdateImageListRequest(), // Simple request to get all the firmware image information (id, name, desc, etc)
198    OtaUpdateImageListRequestResponse(OtaImageListResponse), // Message sent to show all the avilable OTA updates
199    OtaUpdateGroupListRequest(), // Simple request to get a list of all the groups with their memebers
200    OtaUpdateGroupListRequestResponse(OtaGroupListResponse), // Message sent to show all the avilable group info
201    ApplicationManagementRequest(ManagementData), // Message sent for configuration of application
202    ApplicationManagementResponse(ManagementData), // Reponse from application management portion of the app
203    ApplicationRequest(ApplicationData),           // Request/event from a device
204    ApplicationResponse(ApplicationData),          // Reponse from other parts of the server
205    InfluxDataSave(WriteQuery),                    // Takes a pre-prepared query and executes it
206    InfluxDataRequest(ReadQuery), // Takes a pre-prepared query to *read* the database
207    InfluxDataResponse,           // Is the response to InfluxDataRequest
208}