1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
use std::{
collections::BTreeSet,
convert::AsRef,
path::{Path, PathBuf},
time::Duration,
};
use bincode::{Decode, Encode};
use futures::future::try_join_all;
use itertools::Itertools;
use regex::Regex;
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{debug, debug_span, error, info, instrument, trace, Instrument};
use super::NixConfiguration;
/// Error interacting with the `Nix` store
#[derive(Debug, Snafu)]
#[non_exhaustive]
pub enum StoreError {
/// Failure initially enumerating the nix store
InitialEnumeration {
/// Underlying IO error
source: std::io::Error,
/// The configuration in use
configuration: NixConfiguration,
},
/// Mid-enumeration failure
Enumeration {
/// Underlying IO error
source: std::io::Error,
/// The configuration in use
configuration: NixConfiguration,
},
/// Error stripping prefix, this is indicative of a configuration error
StripPrefix {
/// Underlying error
source: std::path::StripPrefixError,
},
/// Error building a regex, this really shouldn't happen
Regex {
/// Underlying error
source: regex::Error,
},
/// Error parsing a nix store path
PathParsing,
/// Error checking the cache
CacheError {
/// Underlying error
e: surf::Error,
},
/// Tried to compare stores with different prefixes
DifferentPrefixes,
}
/// The state of the `Nix` store at a given point in time
#[derive(Clone, Debug, PartialEq, Eq, Default, Encode, Decode)]
pub struct StoreState {
/// Currently in use nix configuration
config: NixConfiguration,
/// The set of all paths top level paths in the store at the time the snapshot was taken
store_state: BTreeSet<PathBuf>,
}
impl StoreState {
/// Create a new, empty, `StoreState` with the specified `Nix` configuration
pub fn new(config: NixConfiguration) -> Self {
Self {
config,
store_state: BTreeSet::new(),
}
}
/// Add a path to the `StoreState`
///
/// # Errors
///
/// Will propagate an error if the prefix is impossible to strip due to misconfiguration
pub fn add_path(&mut self, path: impl AsRef<Path>) -> Result<(), StoreError> {
self.store_state.insert(
path.as_ref()
.strip_prefix(&self.config.store_path)
.context(StripPrefixSnafu)?
.to_owned(),
);
Ok(())
}
/// Iterator over the paths in this `StoreState`
pub fn paths(&self) -> impl Iterator<Item = PathBuf> + '_ {
self.store_state
.iter()
.map(|x| self.config.store_path.join(x))
}
/// Return all paths that are different between this `StoreState` and another `StoreState`
///
/// This returns elements that are in `other`, but not in `self`
///
/// # Errors
///
/// Will return an error if two stores with incompatible configuration are compared.
pub fn diff(&self, other: &StoreState) -> Result<Self, StoreError> {
// Make sure the store paths are the same, and bail otherwise
if self.config.store_path != other.config.store_path {
return DifferentPrefixesSnafu.fail();
}
let filtered_state: BTreeSet<_> = other
.store_state
.difference(&self.store_state)
.cloned()
.collect();
Ok(Self {
config: other.config.clone(),
store_state: filtered_state,
})
}
/// Returns the number of unique paths in the store
pub fn path_count(&self) -> usize {
self.store_state.len()
}
/// Create a `StoreState` from the current status of the `Nix` store pointed to by the given
/// [`NixConfiguration`] including installables as well as other paths matching the supplied
/// regexes
///
/// # Errors
///
/// Will propagate any IO errors that occur trying to enumerate the `Nix` store
#[instrument(err)]
pub fn from_store_including(
configuration: &NixConfiguration,
includes: &[Regex],
) -> Result<Self, StoreError> {
info!("Generating store state");
// Create a new self to dump paths into
let mut result = Self::new(configuration.clone());
// Attempt to read the directory
let read_dir = std::fs::read_dir(&configuration.store_path).with_context(|_| {
InitialEnumerationSnafu {
configuration: configuration.clone(),
}
})?;
for entry in read_dir.map(|x| {
x.with_context(|_| EnumerationSnafu {
configuration: configuration.clone(),
})
}) {
// Pass up the error if there is one
let entry = entry?;
trace!(?entry);
// Make sure it's an either an installable (directory) or matches an include filter
// first, and then add it to ourself
if let Ok(metadata) = entry.metadata() {
// I _think_ we are safe to make the assumption that any directory is an installable
// TODO: Interact with nix to get a more "correct" solution here.
let installable = metadata.is_dir();
// We use a lossy to utf-8 conversion here to make the regexes more ergonomic to
// deal with, I don't think this will be an issue as we are only concerned with
// fairly well behaved top level nix store paths
let included = includes
.iter()
.any(|x| x.is_match(&entry.path().to_string_lossy()));
if installable || included {
result.add_path(entry.path())?;
}
}
}
// Return
Ok(result)
}
/// Create a `StoreState` from the current status of the `Nix` store pointed to by the given
/// [`NixConfiguration`]
///
/// This is a wrapper around [`from_store_including`](Self::from_store_including) with the
/// default include filters implied.
///
/// The default filters will include:
/// - Any installable (non-disableable)
/// - Any `nix-shell` derivations
///
/// # Errors
///
/// Will propagate any IO errors that occur trying to enumerate the `Nix` store
#[instrument(err)]
pub fn from_store(configuration: &NixConfiguration) -> Result<Self, StoreError> {
// Create the regexes
let nix_shell_regex = Regex::new(r".*nix-shell\.drv").context(RegexSnafu)?;
// Call through to the underlying function
Self::from_store_including(configuration, &[nix_shell_regex])
}
/// Filter this `Store` state against the given nix binary cache, removing any paths that are
/// present in the cache. The provided cache uri is expected not to have a trailing slash
///
/// # Errors
///
/// Will propagate any network or nix store interaction errors that occur
#[instrument(skip(self))]
pub async fn filter_against_cache(self, cache_uri: &str) -> Result<Self, StoreError> {
let max_connections = 100;
let client: surf::Client = surf::Config::new()
.set_timeout(Some(Duration::from_millis(1000)))
.set_max_connections_per_host(max_connections)
.try_into()
.unwrap();
let cache_uri: String = cache_uri.to_owned();
let mut filtered_state: BTreeSet<PathBuf> = BTreeSet::new();
// Chunk the state by the number of allowed connections
for chunk in &self.store_state.into_iter().chunks(max_connections * 4) {
// Await all the chunks at the same time
let filtered_chunk = try_join_all(chunk.map(|store_path| {
let client = client.clone();
let cache_uri = cache_uri.clone();
async move {
trace!(?store_path, "Checking path against cache");
// Construct the combined uri, first peel off the hash
//
// The hash should always be the first element, as the store internally
// strips its paths of the store prefix
let hash = store_path
.to_string_lossy()
.split('-')
.next()
.context(PathParsingSnafu)?
.to_owned();
// Now build the expected narinfo uri
let narinfo_uri = format!("{cache_uri}/{hash}.narinfo");
trace!(?narinfo_uri, "Checking for narinfo on remote");
// Now check the cache for it
let result = client.send(surf::head(&narinfo_uri)).await;
// Processing this error is actually kind of involved, we want to
// include it if we get a response code of 404, include it if we get a
// 200 OK, and error if we encounter any other response code
match result {
// If this was okay, the narinfo is in the cache so we can filter it
// out
Ok(_) => {
debug!(?store_path, "Store path in cache, filtering");
Ok(None)
}
// If there was an error, check to see if it was a 404
Err(e) => {
if e.status() == surf::StatusCode::NotFound {
// In this case, we can include the path
trace!(?store_path, "Store path not in cache");
Ok(Some(store_path.clone()))
} else {
// In this case we need to bubble up the error
let status = e.status();
error!(?e, ?status, ?narinfo_uri, "Error interacting with cache");
CacheSnafu { e }.fail()
}
}
}
}
.instrument(debug_span!("Narinfo presence checking task"))
}))
.await?;
filtered_state.extend(filtered_chunk.into_iter().flatten());
}
Ok(Self {
config: self.config,
store_state: filtered_state,
})
}
}