bulletin_board_server/
server.rs

1use std::io::Cursor;
2use std::net::TcpListener;
3
4#[cfg(target_family = "unix")]
5use std::os::unix::net::UnixListener;
6
7use crate::board::BulletinBoard;
8use crate::bulletin::Bulletin;
9use crate::error::{ArchiveError, BulletinError};
10use crate::logging;
11use crate::{
12    ACV_DIR, DEBUG, FILE_THRETHOLD, LISTEN_ADDR, LOG_FILE, LOG_LEVEL, TMP_DIR, TOT_MEM_LIMIT,
13};
14use bulletin_board_common::*;
15use serde_bytes::ByteBuf;
16use std::env;
17use std::fs;
18use std::io;
19use std::net::ToSocketAddrs;
20use std::path::Path;
21use std::sync::LazyLock;
22
23pub struct ServerOptions {
24    debug: bool,
25    listen_addr: Option<String>,
26    tmp_dir: Option<String>,
27    acv_dir: Option<String>,
28    tot_mem_limit: Option<String>,
29    file_threshold: Option<String>,
30    log_file: Option<String>,
31    log_level: Option<u8>,
32}
33
34impl ServerOptions {
35    pub fn new() -> Self {
36        Self {
37            debug: false,
38            listen_addr: None,
39            tmp_dir: None,
40            acv_dir: None,
41            tot_mem_limit: None,
42            file_threshold: None,
43            log_file: None,
44            log_level: None,
45        }
46    }
47    pub fn set_debug(&mut self) {
48        self.debug = true;
49    }
50    pub fn set_listen_addr(&mut self, listen_addr: String) {
51        self.listen_addr = Some(listen_addr);
52    }
53    pub fn set_tmp_dir(&mut self, tmp_dir: String) {
54        self.tmp_dir = Some(tmp_dir);
55    }
56    pub fn set_acv_dir(&mut self, acv_dir: String) {
57        self.acv_dir = Some(acv_dir);
58    }
59    pub fn set_tot_mem_limit(&mut self, tot_mem_limit: String) {
60        self.tot_mem_limit = Some(tot_mem_limit);
61    }
62    pub fn set_file_threshold(&mut self, file_threshold: String) {
63        self.file_threshold = Some(file_threshold);
64    }
65    pub fn set_log_file(&mut self, log_file: String) {
66        self.log_file = Some(log_file);
67    }
68    pub fn set_log_level(&mut self, log_level: u8) {
69        self.log_level = Some(log_level);
70    }
71    pub fn load_options(&self) {
72        if self.debug {
73            unsafe {
74                env::set_var("BB_DEBUG", "");
75            }
76        }
77        if let Some(listen_addr) = &self.listen_addr {
78            unsafe {
79                env::set_var("BB_LISTEN_ADDR", listen_addr);
80            }
81        }
82        if let Some(tmp_dir) = &self.tmp_dir {
83            unsafe {
84                env::set_var("BB_TMP_DIR", tmp_dir);
85            }
86        }
87        if let Some(acv_dir) = &self.acv_dir {
88            unsafe {
89                env::set_var("BB_ACV_DIR", acv_dir);
90            }
91        }
92        if let Some(tot_mem_limit) = &self.tot_mem_limit {
93            unsafe {
94                env::set_var("BB_TOT_MEM_LIMIT", tot_mem_limit);
95            }
96        }
97        if let Some(file_threshold) = &self.file_threshold {
98            unsafe {
99                env::set_var("BB_FILE_THRETHOLD", file_threshold);
100            }
101        }
102        if let Some(log_file) = &self.log_file {
103            unsafe {
104                env::set_var("BB_LOG_FILE", log_file);
105            }
106        }
107        if let Some(log_level) = &self.log_level {
108            unsafe {
109                env::set_var("BB_LOG_LEVEL", log_level.to_string());
110            }
111        }
112        LazyLock::force(&DEBUG);
113        LazyLock::force(&LISTEN_ADDR);
114        LazyLock::force(&TMP_DIR);
115        LazyLock::force(&ACV_DIR);
116        LazyLock::force(&TOT_MEM_LIMIT);
117        LazyLock::force(&FILE_THRETHOLD);
118        LazyLock::force(&LOG_FILE);
119        LazyLock::force(&LOG_LEVEL);
120    }
121}
122
123pub struct BBServer {
124    bulletinboard: BulletinBoard,
125    archive_manipulations: Vec<(String, Option<String>)>,
126}
127
128impl BBServer {
129    pub fn new() -> Result<Self, std::io::Error> {
130        if *LOG_LEVEL == 5 {
131            logging::warn("Server is running in verbose mode.".to_string());
132        }
133        Ok(Self {
134            bulletinboard: BulletinBoard::new()?,
135            archive_manipulations: vec![],
136        })
137    }
138    pub fn listen(&mut self) -> Result<(), std::io::Error> {
139        let ip = LISTEN_ADDR.to_socket_addrs();
140        #[cfg(not(target_family = "unix"))]
141        {
142            if ip.is_ok() {
143                self.listen_tcp()?;
144            } else {
145                return Err(io::Error::new(
146                    io::ErrorKind::AddrNotAvailable,
147                    "Address is invalid or not available.",
148                ));
149            }
150        }
151        #[cfg(target_family = "unix")]
152        {
153            if ip.is_ok() {
154                self.listen_tcp()?;
155            } else if !LISTEN_ADDR.contains(":") {
156                self.listen_unix()?;
157            } else {
158                return Err(io::Error::new(
159                    io::ErrorKind::AddrNotAvailable,
160                    "Address is invalid or not available.",
161                ));
162            }
163        }
164        Ok(())
165    }
166    fn listen_tcp(&mut self) -> Result<(), std::io::Error> {
167        {
168            let version = env!("CARGO_PKG_VERSION");
169            let message = format!("Bulletin Board Server v{version} started.");
170            logging::notice(message);
171
172            let message = format!("Listening on TCP socket: {}.", &*LISTEN_ADDR);
173            logging::info(message);
174        }
175        let listener = TcpListener::bind(&*LISTEN_ADDR)?;
176        for stream in listener.incoming() {
177            let stream = stream?;
178            match self.process(stream) {
179                Ok(exit) => {
180                    if exit {
181                        break;
182                    }
183                }
184                Err(err) => {
185                    let err = Box::leak(err);
186                    logging::error(err.to_string());
187                }
188            }
189        }
190        Ok(())
191    }
192    #[cfg(target_family = "unix")]
193    fn listen_unix(&mut self) -> Result<(), std::io::Error> {
194        if std::path::Path::new(&*LISTEN_ADDR).exists() {
195            std::fs::remove_file(&*LISTEN_ADDR)?;
196        }
197        {
198            let version = env!("CARGO_PKG_VERSION");
199            let message = format!("Bulletin Board Server v{version} started.");
200            logging::notice(message);
201
202            let message = format!("Listening on Unix socket: {}.", &*LISTEN_ADDR);
203            logging::info(message);
204        }
205        let listener = UnixListener::bind(&*LISTEN_ADDR)?;
206        for stream in listener.incoming() {
207            let stream = stream?;
208            match self.process(stream) {
209                Ok(exit) => {
210                    if exit {
211                        break;
212                    }
213                }
214                Err(err) => {
215                    let err = Box::leak(err);
216                    logging::error(err.to_string());
217                }
218            }
219        }
220        Ok(())
221    }
222    fn process<S: std::io::Read + std::io::Write>(
223        &mut self,
224        mut stream: S,
225    ) -> Result<bool, Box<dyn std::error::Error>> {
226        while let Ok(operation) = ciborium::from_reader(&mut stream) {
227            match operation {
228                Operation::Post => {
229                    self.post(&mut stream)?;
230                }
231                Operation::Read => {
232                    self.read(&mut stream)?;
233                }
234                Operation::Relabel => {
235                    self.relabel(&mut stream)?;
236                }
237                Operation::Version => {
238                    self.version(&mut stream)?;
239                }
240                Operation::Status => {
241                    self.status(&mut stream)?;
242                }
243                Operation::Log => {
244                    self.log(&mut stream)?;
245                }
246                Operation::ViewBoard => {
247                    self.view_board(&mut stream)?;
248                }
249                Operation::GetInfo => {
250                    self.get_info(&mut stream)?;
251                }
252                Operation::ClearRevisions => {
253                    self.clear_revisions(&mut stream)?;
254                }
255                Operation::Remove => {
256                    self.remove(&mut stream)?;
257                }
258                Operation::Archive => {
259                    self.archive(&mut stream)?;
260                }
261                Operation::Load => {
262                    self.load(&mut stream)?;
263                }
264                Operation::ListArchive => {
265                    self.list_archive(&mut stream)?;
266                }
267                Operation::RenameArchive => {
268                    self.rename_archive(&mut stream)?;
269                }
270                Operation::DeleteArchive => {
271                    self.delete_archive(&mut stream)?;
272                }
273                Operation::Dump => {
274                    self.dump(&mut stream)?;
275                }
276                Operation::Restore => {
277                    self.reset()?;
278                    self.restore(&mut stream)?;
279                }
280                Operation::ClearLog => {
281                    self.clear_log()?;
282                }
283                Operation::Reset => {
284                    self.reset()?;
285                }
286                Operation::Terminate => {
287                    self.reset()?;
288                    if !LISTEN_ADDR.to_socket_addrs().is_ok() {
289                        if std::path::Path::new(&*LISTEN_ADDR).exists() {
290                            std::fs::remove_file(&*LISTEN_ADDR)?;
291                        }
292                    }
293                    return Ok(true);
294                }
295            };
296        }
297        Ok(false)
298    }
299    fn get_tag<S: std::io::Read + std::io::Write>(
300        &self,
301        operation: &str,
302        title: &String,
303        tag: Option<String>,
304        stream: Option<&mut S>,
305    ) -> Result<String, Box<dyn std::error::Error>> {
306        match tag {
307            Some(tag) => Ok(tag),
308            None => {
309                let tags = self.bulletinboard.find_tags(title);
310                match tags.len() {
311                    0 => {
312                        if let Some(stream) = stream {
313                            ciborium::into_writer(&Response::NotFound, stream)?;
314                        }
315                        Err(Box::new(BulletinError::new(
316                            operation,
317                            "Not found.".to_string(),
318                            title.clone(),
319                            "NA".to_string(),
320                            None,
321                        )))
322                    }
323                    1 => Ok(tags[0].clone()),
324                    _ => {
325                        if let Some(stream) = stream {
326                            ciborium::into_writer(&Response::NotUnique(tags.clone()), stream)?;
327                        }
328                        Err(Box::new(BulletinError::new(
329                            operation,
330                            "Found multiple entries having the same name.".to_string(),
331                            title.clone(),
332                            "NA".to_string(),
333                            None,
334                        )))
335                    }
336                }
337            }
338        }
339    }
340    fn post<S: std::io::Read + std::io::Write>(
341        &mut self,
342        stream: &mut S,
343    ) -> Result<(), Box<dyn std::error::Error>> {
344        let (title, tag, data): (String, String, ByteBuf) = ciborium::from_reader(&mut *stream)?;
345        logging::debug(format!("(post) title: {title}, tag: {tag}."));
346        let bulletin = Bulletin::from_data(data.to_vec());
347        self.bulletinboard
348            .post(title.clone(), tag.clone(), bulletin)
349            .map_err(|err| BulletinError::new("post", err.to_string(), title, tag, None))?;
350        Ok(())
351    }
352    fn read<S: std::io::Read + std::io::Write>(
353        &mut self,
354        stream: &mut S,
355    ) -> Result<(), Box<dyn std::error::Error>> {
356        let (title, tag, revisions): (String, Option<String>, Vec<u64>) =
357            ciborium::from_reader(&mut *stream)?;
358        logging::debug(format!("(read) title: {title}, tag: {tag:?}."));
359        let tag = self.get_tag("read", &title, tag, Some(&mut *stream))?;
360        let mut buf = Cursor::new(vec![]);
361
362        if let Some(bulletins) = self.bulletinboard.take(title.clone(), tag.clone()) {
363            if revisions.is_empty() {
364                if let Some(bulletin) = bulletins.last_mut() {
365                    ciborium::into_writer(&Response::Ok, &mut buf)?;
366                    let data = bulletin.get()?;
367                    ciborium::into_writer(&ByteBuf::from(data), &mut buf)?;
368                    bulletin.close();
369                } else {
370                    ciborium::into_writer(&Response::NotFound, stream)?;
371                    return Err(Box::new(BulletinError::new(
372                        "read",
373                        "Not found.".to_string(),
374                        title,
375                        tag,
376                        None,
377                    )));
378                }
379            } else {
380                for revision in revisions {
381                    if let Some(bulletin) = bulletins.get_mut::<usize>(revision.try_into().unwrap())
382                    {
383                        ciborium::into_writer(&Response::Ok, &mut buf)?;
384                        let data = bulletin.get()?;
385                        ciborium::into_writer(&ByteBuf::from(data), &mut buf)?;
386                        bulletin.close();
387                    } else {
388                        ciborium::into_writer(&Response::NotFound, stream)?;
389                        return Err(Box::new(BulletinError::new(
390                            "read",
391                            "Not found.".to_string(),
392                            title,
393                            tag,
394                            None,
395                        )));
396                    }
397                }
398            }
399        } else {
400            ciborium::into_writer(&Response::NotFound, stream)?;
401            return Err(Box::new(BulletinError::new(
402                "read",
403                "Not found.".to_string(),
404                title,
405                tag,
406                None,
407            )));
408        };
409        buf.set_position(0);
410        io::copy(&mut buf, stream)?;
411
412        Ok(())
413    }
414    fn relabel<S: std::io::Read + std::io::Write>(
415        &mut self,
416        stream: &mut S,
417    ) -> Result<(), Box<dyn std::error::Error>> {
418        let (title_from, tag_from, title_to, tag_to): (
419            String,
420            Option<String>,
421            Option<String>,
422            Option<String>,
423        ) = ciborium::from_reader(&mut *stream)?;
424        logging::debug(format!(
425            "(relabel) title_from: {title_from}, tag_from: {tag_from:?}, title_to: {title_to:?}, tag_to: {tag_to:?}."
426        ));
427        let tag_from = self.get_tag("read", &title_from, tag_from, Some(&mut *stream))?;
428        self.bulletinboard
429            .relabel(title_from, tag_from, title_to, tag_to)?;
430        Ok(())
431    }
432    fn version<S: std::io::Read + std::io::Write>(
433        &self,
434        stream: &mut S,
435    ) -> Result<(), Box<dyn std::error::Error>> {
436        logging::debug(format!("(version)."));
437        let version = env!("CARGO_PKG_VERSION").to_string();
438        ciborium::into_writer(&version, stream)?;
439        Ok(())
440    }
441    fn status<S: std::io::Read + std::io::Write>(
442        &self,
443        stream: &mut S,
444    ) -> Result<(), Box<dyn std::error::Error>> {
445        logging::debug(format!("(status)."));
446        let status = self.bulletinboard.status();
447        ciborium::into_writer(&status, stream)?;
448        Ok(())
449    }
450    fn log<S: std::io::Read + std::io::Write>(
451        &self,
452        stream: &mut S,
453    ) -> Result<(), Box<dyn std::error::Error>> {
454        logging::debug(format!("(log)."));
455        let log = if Path::new(&*LOG_FILE).exists() {
456            std::fs::read_to_string(&*LOG_FILE)?
457        } else {
458            "No logs yet.\n".to_string()
459        };
460        ciborium::into_writer(&log, stream)?;
461        Ok(())
462    }
463    fn view_board<S: std::io::Read + std::io::Write>(
464        &self,
465        stream: &mut S,
466    ) -> Result<(), Box<dyn std::error::Error>> {
467        logging::debug(format!("(view_board)."));
468        let board = self.bulletinboard.view();
469        ciborium::into_writer(&board, stream)?;
470        Ok(())
471    }
472    fn get_info<S: std::io::Read + std::io::Write>(
473        &self,
474        stream: &mut S,
475    ) -> Result<(), Box<dyn std::error::Error>> {
476        let (title, tag): (String, Option<String>) = ciborium::from_reader(&mut *stream)?;
477        logging::debug(format!("(get_info) title: {title}, tag: {tag:?}."));
478        let tag = self.get_tag("get_info", &title, tag, Some(&mut *stream))?;
479        match self.bulletinboard.get_info(title.clone(), tag.clone()) {
480            Some(info) => {
481                let mut buf = Cursor::new(vec![]);
482                ciborium::into_writer(&Response::Ok, &mut buf)?;
483                ciborium::into_writer(&info, &mut buf)?;
484                buf.set_position(0);
485                io::copy(&mut buf, stream)?;
486            }
487            None => {
488                ciborium::into_writer(&Response::NotFound, stream)?;
489                return Err(Box::new(BulletinError::new(
490                    "get_info",
491                    "Not found.".to_string(),
492                    title,
493                    tag,
494                    None,
495                )));
496            }
497        }
498        Ok(())
499    }
500    fn clear_revisions<S: std::io::Read + std::io::Write>(
501        &mut self,
502        stream: &mut S,
503    ) -> Result<(), Box<dyn std::error::Error>> {
504        let (title, tag, revisions): (String, Option<String>, Vec<u64>) =
505            ciborium::from_reader(stream)?;
506        logging::debug(format!(
507            "(clear_revisions) title: {title}, tag: {tag:?}, revisions: {revisions:?}."
508        ));
509        let tag = self.get_tag("clear_revisions", &title, tag, None::<&mut S>)?;
510        self.bulletinboard
511            .clear_revisions(title.clone(), tag.clone(), revisions)
512            .map_err(|err| {
513                Box::new(BulletinError::new(
514                    "clear_revisions",
515                    err.to_string(),
516                    title,
517                    tag,
518                    None,
519                ))
520            })?;
521        Ok(())
522    }
523    fn remove<S: std::io::Read + std::io::Write>(
524        &mut self,
525        stream: &mut S,
526    ) -> Result<(), Box<dyn std::error::Error>> {
527        let (title, tag): (String, Option<String>) = ciborium::from_reader(stream)?;
528        logging::debug(format!("(remove) title: {title}, tag: {tag:?}."));
529        let tag = self.get_tag("remove", &title, tag, None::<&mut S>)?;
530        self.bulletinboard
531            .remove(title.clone(), tag.clone())
532            .map_err(|err| {
533                Box::new(BulletinError::new(
534                    "remove",
535                    err.to_string(),
536                    title,
537                    tag,
538                    None,
539                ))
540            })?;
541        Ok(())
542    }
543    fn archive<S: std::io::Read + std::io::Write>(
544        &mut self,
545        stream: &mut S,
546    ) -> Result<(), Box<dyn std::error::Error>> {
547        let (acv_name, title, tag): (String, String, Option<String>) =
548            ciborium::from_reader(stream)?;
549        logging::debug(format!(
550            "(arvhive) archive_name: {acv_name}, title: {title}, tag: {tag:?}."
551        ));
552        if acv_name.len() == 0 {
553            return Err(Box::new(ArchiveError::new(
554                "archive",
555                "Wrong archive name.".to_string(),
556                acv_name.clone(),
557            )));
558        }
559        let tag = self.get_tag("archive", &title, tag, None::<&mut S>)?;
560        self.bulletinboard
561            .archive(acv_name, title.clone(), tag.clone())
562            .map_err(|err| {
563                Box::new(BulletinError::new(
564                    "archive",
565                    err.to_string(),
566                    title,
567                    tag,
568                    None,
569                ))
570            })?;
571        Ok(())
572    }
573    fn load<S: std::io::Read + std::io::Write>(
574        &mut self,
575        stream: &mut S,
576    ) -> Result<(), Box<dyn std::error::Error>> {
577        let acv_name: String = ciborium::from_reader(stream)?;
578        logging::debug(format!("(load) archive_name: {acv_name}."));
579        if acv_name.len() == 0 {
580            return Err(Box::new(ArchiveError::new(
581                "load",
582                "Wrong archive name.".to_string(),
583                acv_name.clone(),
584            )));
585        }
586        self.bulletinboard
587            .load(acv_name.clone())
588            .map_err(|err| ArchiveError::new("load", err.to_string(), acv_name))?;
589        Ok(())
590    }
591    fn list_archive<S: std::io::Read + std::io::Write>(
592        &self,
593        stream: &mut S,
594    ) -> Result<(), Box<dyn std::error::Error>> {
595        logging::debug(format!("(list_archive)."));
596        match self.bulletinboard.list_archive() {
597            Ok(list) => {
598                ciborium::into_writer(&list, stream)?;
599            }
600            Err(_) => {
601                let empty: Vec<String> = vec![];
602                ciborium::into_writer(&empty, stream)?;
603            }
604        }
605        Ok(())
606    }
607    fn rename_archive<S: std::io::Read + std::io::Write>(
608        &mut self,
609        stream: &mut S,
610    ) -> Result<(), Box<dyn std::error::Error>> {
611        let (acv_from, acv_to): (String, String) = ciborium::from_reader(stream)?;
612        logging::debug(format!("(rename_archive) from: {acv_from}, to: {acv_to}."));
613        if acv_from.len() == 0 {
614            return Err(Box::new(ArchiveError::new(
615                "rename_archive",
616                "Wrong archive name.".to_string(),
617                acv_from.clone(),
618            )));
619        }
620        if acv_to.len() == 0 {
621            return Err(Box::new(ArchiveError::new(
622                "rename_archive",
623                "Wrong archive name.".to_string(),
624                acv_to.clone(),
625            )));
626        }
627        self.archive_manipulations.push((acv_from, Some(acv_to)));
628        Ok(())
629    }
630    fn delete_archive<S: std::io::Read + std::io::Write>(
631        &mut self,
632        stream: &mut S,
633    ) -> Result<(), Box<dyn std::error::Error>> {
634        let acv_name: String = ciborium::from_reader(stream)?;
635        logging::debug(format!("(delete_archive) archive_name: {acv_name}."));
636        if acv_name.len() == 0 {
637            return Err(Box::new(ArchiveError::new(
638                "delete_archive",
639                "Wrong archive name.".to_string(),
640                acv_name.clone(),
641            )));
642        }
643        self.archive_manipulations.push((acv_name, None));
644        Ok(())
645    }
646    fn dump<S: std::io::Read + std::io::Write>(
647        &mut self,
648        stream: &mut S,
649    ) -> Result<(), Box<dyn std::error::Error>> {
650        let acv_name: String = ciborium::from_reader(stream)?;
651        logging::debug(format!("(dump) archive_name: {acv_name}."));
652        if acv_name.len() == 0 {
653            return Err(Box::new(ArchiveError::new(
654                "dump",
655                "Wrong archive name.".to_string(),
656                acv_name.clone(),
657            )));
658        }
659        self.bulletinboard.dump(acv_name)?;
660        Ok(())
661    }
662    fn restore<S: std::io::Read + std::io::Write>(
663        &mut self,
664        stream: &mut S,
665    ) -> Result<(), Box<dyn std::error::Error>> {
666        let acv_name: String = ciborium::from_reader(stream)?;
667        logging::debug(format!("(restore) archive_name: {acv_name}."));
668        if acv_name.len() == 0 {
669            return Err(Box::new(ArchiveError::new(
670                "restore",
671                "Wrong archive name.".to_string(),
672                acv_name.clone(),
673            )));
674        }
675        self.bulletinboard
676            .restore(acv_name.clone())
677            .map_err(|err| ArchiveError::new("restore", err.to_string(), acv_name))?;
678        Ok(())
679    }
680    fn clear_log(&self) -> Result<(), Box<dyn std::error::Error>> {
681        logging::debug(format!("(clear_log)."));
682        if Path::new(&*LOG_FILE).exists() {
683            fs::remove_file(&*LOG_FILE)?;
684        }
685        Ok(())
686    }
687    fn reset(&mut self) -> Result<(), Box<dyn std::error::Error>> {
688        logging::debug(format!("(restore/reset/exit)."));
689        self.bulletinboard.reset()?;
690        for (name_from, name_to) in self.archive_manipulations.drain(..) {
691            match name_to {
692                Some(name_to) => {
693                    self.bulletinboard
694                        .rename_archive(name_from.clone(), name_to.clone())?;
695                    logging::info(format!("Moved archive: {name_from} => {name_to}."));
696                }
697                None => {
698                    self.bulletinboard.delete_archive(name_from.clone())?;
699                    logging::info(format!("Deleted archive: {name_from}."));
700                }
701            }
702        }
703        logging::notice("Server restarted.".to_string());
704        Ok(())
705    }
706}