1use std::io::Cursor;
2use std::net::TcpListener;
3
4#[cfg(target_family = "unix")]
5use std::os::unix::net::UnixListener;
6
7use crate::board::BulletinBoard;
8use crate::bulletin::Bulletin;
9use crate::error::{ArchiveError, BulletinError};
10use crate::logging;
11use crate::{
12 ACV_DIR, DEBUG, FILE_THRETHOLD, LISTEN_ADDR, LOG_FILE, LOG_LEVEL, TMP_DIR, TOT_MEM_LIMIT,
13};
14use bulletin_board_common::*;
15use serde_bytes::ByteBuf;
16use std::env;
17use std::fs;
18use std::io;
19use std::net::ToSocketAddrs;
20use std::path::Path;
21use std::sync::LazyLock;
22
23pub struct ServerOptions {
24 debug: bool,
25 listen_addr: Option<String>,
26 tmp_dir: Option<String>,
27 acv_dir: Option<String>,
28 tot_mem_limit: Option<String>,
29 file_threshold: Option<String>,
30 log_file: Option<String>,
31 log_level: Option<u8>,
32}
33
34impl ServerOptions {
35 pub fn new() -> Self {
36 Self {
37 debug: false,
38 listen_addr: None,
39 tmp_dir: None,
40 acv_dir: None,
41 tot_mem_limit: None,
42 file_threshold: None,
43 log_file: None,
44 log_level: None,
45 }
46 }
47 pub fn set_debug(&mut self) {
48 self.debug = true;
49 }
50 pub fn set_listen_addr(&mut self, listen_addr: String) {
51 self.listen_addr = Some(listen_addr);
52 }
53 pub fn set_tmp_dir(&mut self, tmp_dir: String) {
54 self.tmp_dir = Some(tmp_dir);
55 }
56 pub fn set_acv_dir(&mut self, acv_dir: String) {
57 self.acv_dir = Some(acv_dir);
58 }
59 pub fn set_tot_mem_limit(&mut self, tot_mem_limit: String) {
60 self.tot_mem_limit = Some(tot_mem_limit);
61 }
62 pub fn set_file_threshold(&mut self, file_threshold: String) {
63 self.file_threshold = Some(file_threshold);
64 }
65 pub fn set_log_file(&mut self, log_file: String) {
66 self.log_file = Some(log_file);
67 }
68 pub fn set_log_level(&mut self, log_level: u8) {
69 self.log_level = Some(log_level);
70 }
71 pub fn load_options(&self) {
72 if self.debug {
73 unsafe {
74 env::set_var("BB_DEBUG", "");
75 }
76 }
77 if let Some(listen_addr) = &self.listen_addr {
78 unsafe {
79 env::set_var("BB_LISTEN_ADDR", listen_addr);
80 }
81 }
82 if let Some(tmp_dir) = &self.tmp_dir {
83 unsafe {
84 env::set_var("BB_TMP_DIR", tmp_dir);
85 }
86 }
87 if let Some(acv_dir) = &self.acv_dir {
88 unsafe {
89 env::set_var("BB_ACV_DIR", acv_dir);
90 }
91 }
92 if let Some(tot_mem_limit) = &self.tot_mem_limit {
93 unsafe {
94 env::set_var("BB_TOT_MEM_LIMIT", tot_mem_limit);
95 }
96 }
97 if let Some(file_threshold) = &self.file_threshold {
98 unsafe {
99 env::set_var("BB_FILE_THRETHOLD", file_threshold);
100 }
101 }
102 if let Some(log_file) = &self.log_file {
103 unsafe {
104 env::set_var("BB_LOG_FILE", log_file);
105 }
106 }
107 if let Some(log_level) = &self.log_level {
108 unsafe {
109 env::set_var("BB_LOG_LEVEL", log_level.to_string());
110 }
111 }
112 LazyLock::force(&DEBUG);
113 LazyLock::force(&LISTEN_ADDR);
114 LazyLock::force(&TMP_DIR);
115 LazyLock::force(&ACV_DIR);
116 LazyLock::force(&TOT_MEM_LIMIT);
117 LazyLock::force(&FILE_THRETHOLD);
118 LazyLock::force(&LOG_FILE);
119 LazyLock::force(&LOG_LEVEL);
120 }
121}
122
123pub struct BBServer {
124 bulletinboard: BulletinBoard,
125 archive_manipulations: Vec<(String, Option<String>)>,
126}
127
128impl BBServer {
129 pub fn new() -> Result<Self, std::io::Error> {
130 if *LOG_LEVEL == 5 {
131 logging::warn("Server is running in verbose mode.".to_string());
132 }
133 Ok(Self {
134 bulletinboard: BulletinBoard::new()?,
135 archive_manipulations: vec![],
136 })
137 }
138 pub fn listen(&mut self) -> Result<(), std::io::Error> {
139 let ip = LISTEN_ADDR.to_socket_addrs();
140 #[cfg(not(target_family = "unix"))]
141 {
142 if ip.is_ok() {
143 self.listen_tcp()?;
144 } else {
145 return Err(io::Error::new(
146 io::ErrorKind::AddrNotAvailable,
147 "Address is invalid or not available.",
148 ));
149 }
150 }
151 #[cfg(target_family = "unix")]
152 {
153 if ip.is_ok() {
154 self.listen_tcp()?;
155 } else if !LISTEN_ADDR.contains(":") {
156 self.listen_unix()?;
157 } else {
158 return Err(io::Error::new(
159 io::ErrorKind::AddrNotAvailable,
160 "Address is invalid or not available.",
161 ));
162 }
163 }
164 Ok(())
165 }
166 fn listen_tcp(&mut self) -> Result<(), std::io::Error> {
167 {
168 let version = env!("CARGO_PKG_VERSION");
169 let message = format!("Bulletin Board Server v{version} started.");
170 logging::notice(message);
171
172 let message = format!("Listening on TCP socket: {}.", &*LISTEN_ADDR);
173 logging::info(message);
174 }
175 let listener = TcpListener::bind(&*LISTEN_ADDR)?;
176 for stream in listener.incoming() {
177 let stream = stream?;
178 match self.process(stream) {
179 Ok(exit) => {
180 if exit {
181 break;
182 }
183 }
184 Err(err) => {
185 let err = Box::leak(err);
186 logging::error(err.to_string());
187 }
188 }
189 }
190 Ok(())
191 }
192 #[cfg(target_family = "unix")]
193 fn listen_unix(&mut self) -> Result<(), std::io::Error> {
194 if std::path::Path::new(&*LISTEN_ADDR).exists() {
195 std::fs::remove_file(&*LISTEN_ADDR)?;
196 }
197 {
198 let version = env!("CARGO_PKG_VERSION");
199 let message = format!("Bulletin Board Server v{version} started.");
200 logging::notice(message);
201
202 let message = format!("Listening on Unix socket: {}.", &*LISTEN_ADDR);
203 logging::info(message);
204 }
205 let listener = UnixListener::bind(&*LISTEN_ADDR)?;
206 for stream in listener.incoming() {
207 let stream = stream?;
208 match self.process(stream) {
209 Ok(exit) => {
210 if exit {
211 break;
212 }
213 }
214 Err(err) => {
215 let err = Box::leak(err);
216 logging::error(err.to_string());
217 }
218 }
219 }
220 Ok(())
221 }
222 fn process<S: std::io::Read + std::io::Write>(
223 &mut self,
224 mut stream: S,
225 ) -> Result<bool, Box<dyn std::error::Error>> {
226 while let Ok(operation) = ciborium::from_reader(&mut stream) {
227 match operation {
228 Operation::Post => {
229 self.post(&mut stream)?;
230 }
231 Operation::Read => {
232 self.read(&mut stream)?;
233 }
234 Operation::Relabel => {
235 self.relabel(&mut stream)?;
236 }
237 Operation::Version => {
238 self.version(&mut stream)?;
239 }
240 Operation::Status => {
241 self.status(&mut stream)?;
242 }
243 Operation::Log => {
244 self.log(&mut stream)?;
245 }
246 Operation::ViewBoard => {
247 self.view_board(&mut stream)?;
248 }
249 Operation::GetInfo => {
250 self.get_info(&mut stream)?;
251 }
252 Operation::ClearRevisions => {
253 self.clear_revisions(&mut stream)?;
254 }
255 Operation::Remove => {
256 self.remove(&mut stream)?;
257 }
258 Operation::Archive => {
259 self.archive(&mut stream)?;
260 }
261 Operation::Load => {
262 self.load(&mut stream)?;
263 }
264 Operation::ListArchive => {
265 self.list_archive(&mut stream)?;
266 }
267 Operation::RenameArchive => {
268 self.rename_archive(&mut stream)?;
269 }
270 Operation::DeleteArchive => {
271 self.delete_archive(&mut stream)?;
272 }
273 Operation::Dump => {
274 self.dump(&mut stream)?;
275 }
276 Operation::Restore => {
277 self.reset()?;
278 self.restore(&mut stream)?;
279 }
280 Operation::ClearLog => {
281 self.clear_log()?;
282 }
283 Operation::Reset => {
284 self.reset()?;
285 }
286 Operation::Terminate => {
287 self.reset()?;
288 if !LISTEN_ADDR.to_socket_addrs().is_ok() {
289 if std::path::Path::new(&*LISTEN_ADDR).exists() {
290 std::fs::remove_file(&*LISTEN_ADDR)?;
291 }
292 }
293 return Ok(true);
294 }
295 };
296 }
297 Ok(false)
298 }
299 fn get_tag<S: std::io::Read + std::io::Write>(
300 &self,
301 operation: &str,
302 title: &String,
303 tag: Option<String>,
304 stream: Option<&mut S>,
305 ) -> Result<String, Box<dyn std::error::Error>> {
306 match tag {
307 Some(tag) => Ok(tag),
308 None => {
309 let tags = self.bulletinboard.find_tags(title);
310 match tags.len() {
311 0 => {
312 if let Some(stream) = stream {
313 ciborium::into_writer(&Response::NotFound, stream)?;
314 }
315 Err(Box::new(BulletinError::new(
316 operation,
317 "Not found.".to_string(),
318 title.clone(),
319 "NA".to_string(),
320 None,
321 )))
322 }
323 1 => Ok(tags[0].clone()),
324 _ => {
325 if let Some(stream) = stream {
326 ciborium::into_writer(&Response::NotUnique(tags.clone()), stream)?;
327 }
328 Err(Box::new(BulletinError::new(
329 operation,
330 "Found multiple entries having the same name.".to_string(),
331 title.clone(),
332 "NA".to_string(),
333 None,
334 )))
335 }
336 }
337 }
338 }
339 }
340 fn post<S: std::io::Read + std::io::Write>(
341 &mut self,
342 stream: &mut S,
343 ) -> Result<(), Box<dyn std::error::Error>> {
344 let (title, tag, data): (String, String, ByteBuf) = ciborium::from_reader(&mut *stream)?;
345 logging::debug(format!("(post) title: {title}, tag: {tag}."));
346 let bulletin = Bulletin::from_data(data.to_vec());
347 self.bulletinboard
348 .post(title.clone(), tag.clone(), bulletin)
349 .map_err(|err| BulletinError::new("post", err.to_string(), title, tag, None))?;
350 Ok(())
351 }
352 fn read<S: std::io::Read + std::io::Write>(
353 &mut self,
354 stream: &mut S,
355 ) -> Result<(), Box<dyn std::error::Error>> {
356 let (title, tag, revisions): (String, Option<String>, Vec<u64>) =
357 ciborium::from_reader(&mut *stream)?;
358 logging::debug(format!("(read) title: {title}, tag: {tag:?}."));
359 let tag = self.get_tag("read", &title, tag, Some(&mut *stream))?;
360 let mut buf = Cursor::new(vec![]);
361
362 if let Some(bulletins) = self.bulletinboard.take(title.clone(), tag.clone()) {
363 if revisions.is_empty() {
364 if let Some(bulletin) = bulletins.last_mut() {
365 ciborium::into_writer(&Response::Ok, &mut buf)?;
366 let data = bulletin.get()?;
367 ciborium::into_writer(&ByteBuf::from(data), &mut buf)?;
368 bulletin.close();
369 } else {
370 ciborium::into_writer(&Response::NotFound, stream)?;
371 return Err(Box::new(BulletinError::new(
372 "read",
373 "Not found.".to_string(),
374 title,
375 tag,
376 None,
377 )));
378 }
379 } else {
380 for revision in revisions {
381 if let Some(bulletin) = bulletins.get_mut::<usize>(revision.try_into().unwrap())
382 {
383 ciborium::into_writer(&Response::Ok, &mut buf)?;
384 let data = bulletin.get()?;
385 ciborium::into_writer(&ByteBuf::from(data), &mut buf)?;
386 bulletin.close();
387 } else {
388 ciborium::into_writer(&Response::NotFound, stream)?;
389 return Err(Box::new(BulletinError::new(
390 "read",
391 "Not found.".to_string(),
392 title,
393 tag,
394 None,
395 )));
396 }
397 }
398 }
399 } else {
400 ciborium::into_writer(&Response::NotFound, stream)?;
401 return Err(Box::new(BulletinError::new(
402 "read",
403 "Not found.".to_string(),
404 title,
405 tag,
406 None,
407 )));
408 };
409 buf.set_position(0);
410 io::copy(&mut buf, stream)?;
411
412 Ok(())
413 }
414 fn relabel<S: std::io::Read + std::io::Write>(
415 &mut self,
416 stream: &mut S,
417 ) -> Result<(), Box<dyn std::error::Error>> {
418 let (title_from, tag_from, title_to, tag_to): (
419 String,
420 Option<String>,
421 Option<String>,
422 Option<String>,
423 ) = ciborium::from_reader(&mut *stream)?;
424 logging::debug(format!(
425 "(relabel) title_from: {title_from}, tag_from: {tag_from:?}, title_to: {title_to:?}, tag_to: {tag_to:?}."
426 ));
427 let tag_from = self.get_tag("read", &title_from, tag_from, Some(&mut *stream))?;
428 self.bulletinboard
429 .relabel(title_from, tag_from, title_to, tag_to)?;
430 Ok(())
431 }
432 fn version<S: std::io::Read + std::io::Write>(
433 &self,
434 stream: &mut S,
435 ) -> Result<(), Box<dyn std::error::Error>> {
436 logging::debug(format!("(version)."));
437 let version = env!("CARGO_PKG_VERSION").to_string();
438 ciborium::into_writer(&version, stream)?;
439 Ok(())
440 }
441 fn status<S: std::io::Read + std::io::Write>(
442 &self,
443 stream: &mut S,
444 ) -> Result<(), Box<dyn std::error::Error>> {
445 logging::debug(format!("(status)."));
446 let status = self.bulletinboard.status();
447 ciborium::into_writer(&status, stream)?;
448 Ok(())
449 }
450 fn log<S: std::io::Read + std::io::Write>(
451 &self,
452 stream: &mut S,
453 ) -> Result<(), Box<dyn std::error::Error>> {
454 logging::debug(format!("(log)."));
455 let log = if Path::new(&*LOG_FILE).exists() {
456 std::fs::read_to_string(&*LOG_FILE)?
457 } else {
458 "No logs yet.\n".to_string()
459 };
460 ciborium::into_writer(&log, stream)?;
461 Ok(())
462 }
463 fn view_board<S: std::io::Read + std::io::Write>(
464 &self,
465 stream: &mut S,
466 ) -> Result<(), Box<dyn std::error::Error>> {
467 logging::debug(format!("(view_board)."));
468 let board = self.bulletinboard.view();
469 ciborium::into_writer(&board, stream)?;
470 Ok(())
471 }
472 fn get_info<S: std::io::Read + std::io::Write>(
473 &self,
474 stream: &mut S,
475 ) -> Result<(), Box<dyn std::error::Error>> {
476 let (title, tag): (String, Option<String>) = ciborium::from_reader(&mut *stream)?;
477 logging::debug(format!("(get_info) title: {title}, tag: {tag:?}."));
478 let tag = self.get_tag("get_info", &title, tag, Some(&mut *stream))?;
479 match self.bulletinboard.get_info(title.clone(), tag.clone()) {
480 Some(info) => {
481 let mut buf = Cursor::new(vec![]);
482 ciborium::into_writer(&Response::Ok, &mut buf)?;
483 ciborium::into_writer(&info, &mut buf)?;
484 buf.set_position(0);
485 io::copy(&mut buf, stream)?;
486 }
487 None => {
488 ciborium::into_writer(&Response::NotFound, stream)?;
489 return Err(Box::new(BulletinError::new(
490 "get_info",
491 "Not found.".to_string(),
492 title,
493 tag,
494 None,
495 )));
496 }
497 }
498 Ok(())
499 }
500 fn clear_revisions<S: std::io::Read + std::io::Write>(
501 &mut self,
502 stream: &mut S,
503 ) -> Result<(), Box<dyn std::error::Error>> {
504 let (title, tag, revisions): (String, Option<String>, Vec<u64>) =
505 ciborium::from_reader(stream)?;
506 logging::debug(format!(
507 "(clear_revisions) title: {title}, tag: {tag:?}, revisions: {revisions:?}."
508 ));
509 let tag = self.get_tag("clear_revisions", &title, tag, None::<&mut S>)?;
510 self.bulletinboard
511 .clear_revisions(title.clone(), tag.clone(), revisions)
512 .map_err(|err| {
513 Box::new(BulletinError::new(
514 "clear_revisions",
515 err.to_string(),
516 title,
517 tag,
518 None,
519 ))
520 })?;
521 Ok(())
522 }
523 fn remove<S: std::io::Read + std::io::Write>(
524 &mut self,
525 stream: &mut S,
526 ) -> Result<(), Box<dyn std::error::Error>> {
527 let (title, tag): (String, Option<String>) = ciborium::from_reader(stream)?;
528 logging::debug(format!("(remove) title: {title}, tag: {tag:?}."));
529 let tag = self.get_tag("remove", &title, tag, None::<&mut S>)?;
530 self.bulletinboard
531 .remove(title.clone(), tag.clone())
532 .map_err(|err| {
533 Box::new(BulletinError::new(
534 "remove",
535 err.to_string(),
536 title,
537 tag,
538 None,
539 ))
540 })?;
541 Ok(())
542 }
543 fn archive<S: std::io::Read + std::io::Write>(
544 &mut self,
545 stream: &mut S,
546 ) -> Result<(), Box<dyn std::error::Error>> {
547 let (acv_name, title, tag): (String, String, Option<String>) =
548 ciborium::from_reader(stream)?;
549 logging::debug(format!(
550 "(arvhive) archive_name: {acv_name}, title: {title}, tag: {tag:?}."
551 ));
552 if acv_name.len() == 0 {
553 return Err(Box::new(ArchiveError::new(
554 "archive",
555 "Wrong archive name.".to_string(),
556 acv_name.clone(),
557 )));
558 }
559 let tag = self.get_tag("archive", &title, tag, None::<&mut S>)?;
560 self.bulletinboard
561 .archive(acv_name, title.clone(), tag.clone())
562 .map_err(|err| {
563 Box::new(BulletinError::new(
564 "archive",
565 err.to_string(),
566 title,
567 tag,
568 None,
569 ))
570 })?;
571 Ok(())
572 }
573 fn load<S: std::io::Read + std::io::Write>(
574 &mut self,
575 stream: &mut S,
576 ) -> Result<(), Box<dyn std::error::Error>> {
577 let acv_name: String = ciborium::from_reader(stream)?;
578 logging::debug(format!("(load) archive_name: {acv_name}."));
579 if acv_name.len() == 0 {
580 return Err(Box::new(ArchiveError::new(
581 "load",
582 "Wrong archive name.".to_string(),
583 acv_name.clone(),
584 )));
585 }
586 self.bulletinboard
587 .load(acv_name.clone())
588 .map_err(|err| ArchiveError::new("load", err.to_string(), acv_name))?;
589 Ok(())
590 }
591 fn list_archive<S: std::io::Read + std::io::Write>(
592 &self,
593 stream: &mut S,
594 ) -> Result<(), Box<dyn std::error::Error>> {
595 logging::debug(format!("(list_archive)."));
596 match self.bulletinboard.list_archive() {
597 Ok(list) => {
598 ciborium::into_writer(&list, stream)?;
599 }
600 Err(_) => {
601 let empty: Vec<String> = vec![];
602 ciborium::into_writer(&empty, stream)?;
603 }
604 }
605 Ok(())
606 }
607 fn rename_archive<S: std::io::Read + std::io::Write>(
608 &mut self,
609 stream: &mut S,
610 ) -> Result<(), Box<dyn std::error::Error>> {
611 let (acv_from, acv_to): (String, String) = ciborium::from_reader(stream)?;
612 logging::debug(format!("(rename_archive) from: {acv_from}, to: {acv_to}."));
613 if acv_from.len() == 0 {
614 return Err(Box::new(ArchiveError::new(
615 "rename_archive",
616 "Wrong archive name.".to_string(),
617 acv_from.clone(),
618 )));
619 }
620 if acv_to.len() == 0 {
621 return Err(Box::new(ArchiveError::new(
622 "rename_archive",
623 "Wrong archive name.".to_string(),
624 acv_to.clone(),
625 )));
626 }
627 self.archive_manipulations.push((acv_from, Some(acv_to)));
628 Ok(())
629 }
630 fn delete_archive<S: std::io::Read + std::io::Write>(
631 &mut self,
632 stream: &mut S,
633 ) -> Result<(), Box<dyn std::error::Error>> {
634 let acv_name: String = ciborium::from_reader(stream)?;
635 logging::debug(format!("(delete_archive) archive_name: {acv_name}."));
636 if acv_name.len() == 0 {
637 return Err(Box::new(ArchiveError::new(
638 "delete_archive",
639 "Wrong archive name.".to_string(),
640 acv_name.clone(),
641 )));
642 }
643 self.archive_manipulations.push((acv_name, None));
644 Ok(())
645 }
646 fn dump<S: std::io::Read + std::io::Write>(
647 &mut self,
648 stream: &mut S,
649 ) -> Result<(), Box<dyn std::error::Error>> {
650 let acv_name: String = ciborium::from_reader(stream)?;
651 logging::debug(format!("(dump) archive_name: {acv_name}."));
652 if acv_name.len() == 0 {
653 return Err(Box::new(ArchiveError::new(
654 "dump",
655 "Wrong archive name.".to_string(),
656 acv_name.clone(),
657 )));
658 }
659 self.bulletinboard.dump(acv_name)?;
660 Ok(())
661 }
662 fn restore<S: std::io::Read + std::io::Write>(
663 &mut self,
664 stream: &mut S,
665 ) -> Result<(), Box<dyn std::error::Error>> {
666 let acv_name: String = ciborium::from_reader(stream)?;
667 logging::debug(format!("(restore) archive_name: {acv_name}."));
668 if acv_name.len() == 0 {
669 return Err(Box::new(ArchiveError::new(
670 "restore",
671 "Wrong archive name.".to_string(),
672 acv_name.clone(),
673 )));
674 }
675 self.bulletinboard
676 .restore(acv_name.clone())
677 .map_err(|err| ArchiveError::new("restore", err.to_string(), acv_name))?;
678 Ok(())
679 }
680 fn clear_log(&self) -> Result<(), Box<dyn std::error::Error>> {
681 logging::debug(format!("(clear_log)."));
682 if Path::new(&*LOG_FILE).exists() {
683 fs::remove_file(&*LOG_FILE)?;
684 }
685 Ok(())
686 }
687 fn reset(&mut self) -> Result<(), Box<dyn std::error::Error>> {
688 logging::debug(format!("(restore/reset/exit)."));
689 self.bulletinboard.reset()?;
690 for (name_from, name_to) in self.archive_manipulations.drain(..) {
691 match name_to {
692 Some(name_to) => {
693 self.bulletinboard
694 .rename_archive(name_from.clone(), name_to.clone())?;
695 logging::info(format!("Moved archive: {name_from} => {name_to}."));
696 }
697 None => {
698 self.bulletinboard.delete_archive(name_from.clone())?;
699 logging::info(format!("Deleted archive: {name_from}."));
700 }
701 }
702 }
703 logging::notice("Server restarted.".to_string());
704 Ok(())
705 }
706}