use std::fmt::Display;
use std::fs::{self};
use std::hash::Hash;
use std::io::{ErrorKind, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use deepsize::DeepSizeOf;
use serde::{Deserialize, Serialize};
use tracing::{debug, error, info, trace, warn};
use crate::errors::StoreError;
use crate::records::{Check, CheckType, TARGETS};
use crate::DAEMON_USER;
#[cfg(feature = "compression")]
use zstd;
pub const DB_NAME: &str = "netpulse.store";
pub const DB_PATH: &str = "/var/lib/netpulse";
#[cfg(feature = "compression")]
pub const ZSTD_COMPRESSION_LEVEL: i32 = 4;
pub const ENV_PATH: &str = "NETPULSE_STORE_PATH";
pub const DEFAULT_PERIOD: i64 = 60;
pub const OUTAGE_TIME_SPAN: i64 = DEFAULT_PERIOD * OUTAGE_TIME_FACTOR;
const OUTAGE_TIME_FACTOR: i64 = 5;
pub const ENV_PERIOD: &str = "NETPULSE_PERIOD";
#[derive(
Debug,
PartialEq,
Eq,
Hash,
Copy,
Clone,
DeepSizeOf,
PartialOrd,
Ord,
serde_repr::Serialize_repr,
serde_repr::Deserialize_repr,
)]
#[allow(missing_docs)] #[repr(u8)]
pub enum Version {
V0 = 0,
V1 = 1,
V2 = 2,
}
#[derive(Debug, PartialEq, Eq, Hash, Deserialize, Serialize, DeepSizeOf)]
pub struct Store {
version: Version,
checks: Vec<Check>,
#[serde(skip)]
readonly: bool,
}
impl Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.raw())
}
}
impl TryFrom<u8> for Version {
type Error = StoreError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
0 => Self::V0,
1 => Self::V1,
2 => Self::V2,
_ => return Err(StoreError::BadStoreVersion(value)),
})
}
}
impl From<Version> for u8 {
fn from(value: Version) -> Self {
value.raw()
}
}
impl Version {
pub const CURRENT: Self = Self::V2;
pub const SUPPROTED: &[Self] = &[Self::V0, Self::V1, Self::V2];
pub const fn raw(&self) -> u8 {
*self as u8
}
pub fn next(&self) -> Option<Self> {
Some(match *self {
Self::V0 => Self::V1,
Self::V1 => Self::V2,
Self::V2 => return None,
})
}
}
impl Store {
pub fn path() -> PathBuf {
if let Some(var) = std::env::var_os(ENV_PATH) {
let mut p = PathBuf::from(var);
p.push(DB_NAME);
debug!("Store Path: {}", p.display());
p
} else {
PathBuf::from(format!("{DB_PATH}/{DB_NAME}"))
}
}
fn new() -> Self {
Self {
version: Version::CURRENT,
checks: Vec::new(),
readonly: false,
}
}
pub fn setup() -> Result<(), StoreError> {
let path = Self::path();
let parent_path = path
.parent()
.expect("the store path has no parent directory");
let user = nix::unistd::User::from_name(DAEMON_USER)
.map_err(std::io::Error::other)
.expect("could not get user for netpulse")
.ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "netpulse user not found")
})
.expect("could not get user for netpulse");
fs::create_dir_all(parent_path)?;
std::os::unix::fs::chown(parent_path, Some(user.uid.into()), Some(user.gid.into()))
.inspect_err(|e| {
error!("could not set owner of store directory to the daemon user: {e}")
})?;
Ok(())
}
pub fn create() -> Result<Self, StoreError> {
let file = match fs::File::options()
.read(false)
.write(true)
.append(false)
.create_new(true)
.mode(0o644)
.open(Self::path())
{
Ok(file) => file,
Err(err) => {
error!("opening the store file for writing failed: {err}");
return Err(err.into());
}
};
let store = Store::new();
#[cfg(feature = "compression")]
let mut writer = zstd::Encoder::new(file, ZSTD_COMPRESSION_LEVEL)?;
#[cfg(not(feature = "compression"))]
let mut writer = file;
writer.write_all(&bincode::serialize(&store)?)?;
writer.flush()?;
Ok(store)
}
pub fn load_or_create() -> Result<Self, StoreError> {
match Self::load(false) {
Ok(store) => Ok(store),
Err(err) => match &err {
StoreError::DoesNotExist => Self::create(),
StoreError::Load { source } => {
error!("{err}");
#[allow(clippy::single_match)] match &(**source) {
bincode::ErrorKind::Io(io_err) => match io_err.kind() {
ErrorKind::UnexpectedEof => {
error!("The file ends too early, might be an old format, cut off, or empty. Not doing anything in case you need to keep old data");
}
_ => (),
},
_ => (),
}
Err(err)
}
_ => {
error!("Error while trying to load the store: {err:#}");
Err(err)
}
},
}
}
pub fn load(readonly: bool) -> Result<Self, StoreError> {
debug!("Trying to open the store");
let file = match fs::File::options()
.read(true)
.write(false)
.create_new(false)
.open(Self::path())
{
Ok(file) => file,
Err(err) => {
match err.kind() {
ErrorKind::NotFound => return Err(StoreError::DoesNotExist),
ErrorKind::PermissionDenied => error!("Not allowed to access store"),
_ => (),
};
return Err(err.into());
}
};
debug!("Store file opened");
#[cfg(feature = "compression")]
let reader = zstd::Decoder::new(file)?;
#[cfg(not(feature = "compression"))]
let mut reader = file;
let mut store: Store = bincode::deserialize_from(reader)?;
if store.version != Version::CURRENT {
warn!("The store that was loaded is not of the current version: store has {} but the current version is {}", store.version, Version::CURRENT);
if Version::SUPPROTED.contains(&store.version) {
warn!("The different store version is still supported, migrating to newer version");
warn!("Temp migration in memory, can be made permanent by saving");
if store.version > Version::CURRENT {
warn!("The store version is newer than this version of netpulse can normally handle! Trying to ignore potential differences and loading as READONLY!");
store.readonly = true;
}
while store.version < Version::CURRENT {
for check in store.checks_mut().iter_mut() {
if let Err(e) = check.migrate(Version::V0) {
panic!("Error while migrating check '{}': {e}", check.get_hash());
}
}
store.version = store
.version
.next()
.expect("Somehow migrated to a version that does not exist");
}
assert_eq!(store.version, Version::CURRENT);
} else {
error!("The store version is not supported");
return Err(StoreError::UnsupportedVersion);
}
}
if readonly {
store.set_readonly();
}
Ok(store)
}
pub fn save(&self) -> Result<(), StoreError> {
info!("Saving the store");
if self.readonly {
return Err(StoreError::IsReadonly);
}
let file = match fs::File::options()
.read(false)
.write(true)
.append(false)
.create_new(false)
.truncate(true)
.create(false)
.open(Self::path())
{
Ok(file) => file,
Err(err) => match err.kind() {
ErrorKind::NotFound => return Err(StoreError::DoesNotExist),
_ => return Err(err.into()),
},
};
#[cfg(feature = "compression")]
let mut writer = zstd::Encoder::new(file, ZSTD_COMPRESSION_LEVEL)?;
#[cfg(not(feature = "compression"))]
let mut writer = file;
writer.write_all(&bincode::serialize(&self)?)?;
writer.flush()?;
Ok(())
}
pub fn add_check(&mut self, check: impl Into<Check>) {
self.checks.push(check.into());
}
pub fn checks(&self) -> &[Check] {
&self.checks
}
pub fn period_seconds(&self) -> i64 {
if let Ok(v) = std::env::var(ENV_PERIOD) {
v.parse().unwrap_or(DEFAULT_PERIOD)
} else {
DEFAULT_PERIOD
}
}
pub fn get_hash(&self) -> blake3::Hash {
blake3::hash(&bincode::serialize(&self).expect("serialization of the store failed"))
}
pub fn get_hash_of_file(&self) -> Result<String, StoreError> {
let out = Command::new("sha256sum").arg(Self::path()).output()?;
if !out.status.success() {
error!(
"error while making the hash over the store file:\nStdout\n{:?}\n\nStdin\n{:?}",
out.stdout, out.stderr
);
return Err(StoreError::ProcessEndedWithoutSuccess);
}
Ok(std::str::from_utf8(&out.stdout)?
.split(" ")
.collect::<Vec<&str>>()[0]
.to_string())
}
pub fn make_checks(&mut self) -> Vec<&Check> {
let last_old = self
.checks
.iter()
.enumerate()
.next_back()
.map(|a| a.0)
.unwrap_or(0);
Self::primitive_make_checks(&mut self.checks);
let mut made_checks = Vec::new();
for new_check in self.checks.iter().skip(last_old) {
made_checks.push(new_check);
}
made_checks
}
pub fn primitive_make_checks(buf: &mut Vec<Check>) {
let arcbuf = Arc::new(Mutex::new(Vec::new()));
let mut threads = Vec::new();
for check_type in CheckType::default_enabled() {
trace!("check type: {check_type}");
if *check_type == CheckType::Icmp && !has_cap_net_raw() {
warn!("Does not have CAP_NET_RAW, can't use {check_type}, skipping");
continue;
}
for target in TARGETS {
let thread_ab = arcbuf.clone();
threads.push(std::thread::spawn(move || {
trace!("start thread for {target} with {check_type}");
let check = check_type.make(
std::net::IpAddr::from_str(target)
.expect("a target constant was not an Ip Address"),
);
thread_ab.lock().expect("lock is poisoned").push(check);
trace!("end thread for {target} with {check_type}");
}));
}
}
for th in threads {
th.join().expect("could not join thread");
}
let abuf = arcbuf.lock().unwrap();
for check in abuf.iter() {
buf.push(*check);
}
}
pub fn version(&self) -> Version {
self.version
}
pub fn checks_mut(&mut self) -> &mut Vec<Check> {
&mut self.checks
}
pub fn peek_file_version() -> Result<Version, StoreError> {
#[derive(Deserialize)]
struct VersionOnly {
version: Version,
#[serde(skip)]
_rest: serde::de::IgnoredAny,
}
let file = std::fs::File::open(Self::path())?;
#[cfg(feature = "compression")]
let reader = zstd::Decoder::new(file)?;
#[cfg(not(feature = "compression"))]
let reader = file;
let version_only: VersionOnly = bincode::deserialize_from(reader)?;
Ok(version_only.version)
}
pub fn readonly(&self) -> bool {
self.readonly
}
pub fn set_readonly(&mut self) {
self.readonly = true;
}
}
fn has_cap_net_raw() -> bool {
if nix::unistd::getuid().is_root() {
return true;
}
if let Ok(caps) = caps::read(None, caps::CapSet::Effective) {
caps.contains(&caps::Capability::CAP_NET_RAW)
} else {
warn!("Could not read capabilities");
false
}
}