1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
//! Fair Resource Distribution Algorithm //! //! This project provides an implementation of the `fairdist` algorithm, a way to distribute a set //! of resources fairly amongst a dynamic number of users. Its infrastructure manages a hierarchy //! of users that currently hold shares of a resource. New users can be added at any time, and //! charges can be requested and released dynamically. The system ensures that every new user //! joining the system gets a guaranteed share of the total, regardless of how many users already //! hold resources. //! //! The API abstracts over resources through the `[counter::Counter]` trait. This trait represents //! a set of numeric resource counters. In most cases these will just be `usize` integers. //! However, that is not a requirement. The resource counter is only required to "look like an //! integer" (see `[counter::Counter]` and its dependencies on the `num` crate). Furthermore, the //! API abstracts over users. It does not track users who interact with the system, but relies on //! the API user to provide a key or index that identifies the user of a given operation. This way //! the resource allocators can tell whether separate operations are performed by the same //! accounting user, or whether they are different. This is all information the system needs about //! the different users. //! //! At the root level, the API user has to create a `[Resource]` object with the resource counters //! initialized to reflect the total resources available for distribution. These resources can now //! be requested through `[Charge]` objects. Charges are RAII-type objects that record a resource //! charge and make sure resources are released again when the `Charge` is dropped. To request //! resources, the API user must specify the accounting user to perform the charge as, as well as //! the amount to charge. If the quotas of the accounting user were exceeded, the charge is //! rejected. Otherwise, the charge will be granted. The allocators behind the resource objects //! make sure resources are distributed fairly. Additionally to the root-level `[Resource]` //! objects, the API allows to create sub-hierarchies to redistribute resources further. That is, //! new non-root-level `[Resource]` objects can be created given the accounting user to act as. //! This new sub-resource will then behave similar to the top-level resource and allow charges to //! be performed. It will ensure the same guarantees on its own sub-level, so the sub-distribution //! is again fair. Furthermore, the top-level guarantees are unaffected by sub-hierarchies that //! are formed. That is, individual accounting users cannot increase their resource shares by //! creating sub-hierarchies. //! //! There are multiple possible algorithm that can be used for the individual allocations. In most //! cases the selected default is sufficient. However, the `[allocator::Allocator]` trait allows //! selecting from a range of predefined allocators, as well as creating individual allocators. //! The default allocator guarantees quasilinear shares to every user of the system. In //! particular, this means regardless how many users join the allocation system, each one is //! guaranteed a share of 1 over `O(n * log(n)^2)` of the total. use std::sync::{Arc, Weak}; pub mod allocator; pub mod counter; // Reference to a Registry Object // // Registries hold our internal usage maps as well as the associated resource counters. Since we // need shared access to the registry from different threads, we put it into a reference counter // lock. The `RegistryRef` type is an alias for this. type RegistryRef<A, C, I> = Arc<std::sync::Mutex<Registry<A, C, I>>>; // Usage table of a single consumer // // The resources of a registry are distributed amongst different users. The users are not // represented in any way. Instead, we only require users to have a unique index of type `I`. We // then track a usage table for each user. This table records what the user did so far and makes // sure all accesses through the same user share a common usage table. // // Note that the usage table of a specific user can be pinned via the `[Subscription]` type. This // avoids doing a look-up on each modification. However, it means access to the usage table is // shared. Therefore, the `[Subscription]` type implements safe sharing of references to the usage // table. // // All fields except for `share` are static. They are not modified during the lifetime of a usage // table. The `share` field is protected by the mutex of the linked registry. However, for // performance reasons it is located on the usage table rather than as a separate lookup tree on // the registry. You must lock the registry when accessing `share`! struct Usage<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { index: I, registry_ref: RegistryRef<A, C, I>, share: std::cell::RefCell<C>, } // Resource registry // // A resource registry is the hidden root object for resource distribution. The registry tracks // the resources that were assigned to it, as well as all usage tables which got shares of its // resources. // // A resource registry is usually hidden behind a reference counted mutex, which must be held to // perform any kind of modification. // // Resource registries form a hierarchy. The root level resource has no parent resource but has // resources assigned statically. Each subscription to this resource registry can itself be turned // into a resource registry, though. This allows subdividing and distributing resources amongst // hierarchical systems. Note that sub-hierarchies never affect the shares granted on parent or // sibling hierarchies. That is, access to sub-hierarchies can be delegated to untrusted parties // without affecting the security of the entire system. struct Registry<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { parent: Option<Subscription<A, C, I>>, usages: std::collections::BTreeMap<I, Weak<Usage<A, C, I>>>, reserve: C, } /// A subscription to a resource. /// /// A subscription allows to perform continous allocations on a resource. A subscription is simply /// the combination of a resource and the user ID to perform accesses as. That is, a subscription /// can be created from a resource by providing the user ID to subscribe as. All subscriptions of /// different user IDs are distinct, but subscriptions with the same user ID reference the same /// underlying usage table. /// /// To request resource from a resource registry, you have to subscribe to it first. This /// subscription is then used to request the resources in a charge. The subscription is pinned in /// every charge, so there is no need for the caller to cache it. However, if continuous charges /// are required, caching the subscription will skip a map lookup on every charge. Furthermore, as /// long as a subscription is alive, the underlying resource registry consider the subscriber to /// be interested in the resources. The resource allocation will thus be able to improve the /// fairness of the resource distribution, because it can better estimate how many users are /// interested in resources. pub struct Subscription<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { usage_ref: Option<Arc<Usage<A, C, I>>>, } /// A resource that can be distributed amongst users. /// /// The resource object represents a set of resources that is offered for distribution amongst /// users. Resource objects are either assigned static resources at initialization, or they /// distribute resources of their parent to form a hierarchy. /// /// Any resource object can be used to subscribe to and then charge resource requests through that /// subscription. /// /// Resource objects are merely a reference to the underlying resource registry object. Even if /// the resource object is destroyed, the underlying resource registry object will remain alive /// until all charges on it were dropped. pub struct Resource<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { registry_ref: RegistryRef<A, C, I>, } /// A charge represents temporarily borrowed resources. /// /// When requesting resource shares from a resource object, a `Charge` object will be created. /// This is an RAII object that represents the charge. When the object is dropped, the charge /// will be released as well. /// /// Note that shares on a charge object can be manually increases or decreases, in which case the /// RAII benefits are lost, though. pub struct Charge<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { subscription: Subscription<A, C, I>, slot: C::Index, charge: C::Scalar, } impl<A, C, I> Usage<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { // Internal helper to initialize a usage object. // // This initializes a new `Usage` object. Both `index` and `registry_ref` are cloned and // stored in the object. The `reserve` parameter is used as template to create a new counter // object. The `reserve` is passed separately, since `registry_ref` might be locked by the // caller, so its reserve cannot be accessed here. The caller must make sure the passed // reserve is compatible to the one in the registry, though (usually, you would just pass a // reference to the reserve in `registry_ref` here). fn new(index: &I, registry_ref: &RegistryRef<A, C, I>, reserve: &C) -> Self { Usage { index: index.clone(), registry_ref: registry_ref.clone(), share: std::cell::RefCell::new(counter::Counter::from_template(reserve)), } } // Charge Backend // // This is the backend that performs resource charges. It tries to charge `amount` resources // from the resource slot `index`. On success, true is returned. Otherwise, false is returned. // // This function performs a recursive charge. It tries to serve the request on the current // level, but if that is not sufficient, it will traverse upwards. fn charge(&self, index: &C::Index, amount: &C::Scalar) -> bool { let mut guard = self.registry_ref.lock().unwrap(); let mut share = self.share.borrow_mut(); let registry = &mut *guard; let n_usages = registry.usages.len(); let reserve_slot = registry.reserve.slot(index); let share_slot = share.slot(index); match <A as allocator::Allocator<C::Scalar>>::minimum_reserve_for( n_usages, share_slot, amount, ) { None => { false } Some(mut minimum) => { if &*reserve_slot >= &minimum { *share_slot += amount; *reserve_slot -= amount; true } else { minimum -= reserve_slot; if let Some(subscription) = registry.parent.as_ref() { // XXX: This currently recurses into the charge helper. We should instead // do this via a frame objects on a `Vec`. However, that requires to // investigate how to store mutex RAII guards in the same `Vec`... if subscription.deref().charge(index, &minimum) { *share_slot += amount; *reserve_slot -= amount; true } else { false } } else { false } } } } } // Discharge Backend // // This is the backend implementation of a discharge. It releases `amount` resources of the // resource slot `index`. It is the caller's responsibility to guarantee the resources were // previously allocated through a charge. fn discharge(&self, index: &C::Index, amount: &C::Scalar) { let mut guard = self.registry_ref.lock().unwrap(); let mut share = self.share.borrow_mut(); let share_slot = share.slot(index); let reserve_slot = guard.reserve.slot(index); // This simply releases the resources to the current level. It never traverses the parent // hierarchy to release further resources, as it would provide little gain but cost a lot // of computing time. *share_slot -= amount; *reserve_slot += amount; } } impl<A, C, I> Drop for Usage<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { // Verify all shares were dropped, to protect against charge leaks. fn drop(&mut self) { assert!{self.share.borrow().empty()}; } } impl<A, C, I> Registry<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { // Create a new subscription object. // // This is the internal backend that creates a new subscription object given a registry and an // index. Note that while this will create a new subscription, it will still share the pinned // usage with all other subscriptions of this index. // // This is an associated function since we need access to the Arc so we can clone it for the // new subscription. fn subscribe(registry_ref: &RegistryRef<A, C, I>, index: &I) -> Subscription<A, C, I> { // Lock the registry and search for a usage entry indexed by `index`. If none is available // create a new one and insert it into the registry. The result of this block is a usage // Arc to either the existing or the new entry. let mut guard = registry_ref.lock().unwrap(); let registry = &mut *guard; let usage = match registry.usages.entry(index.clone()) { std::collections::btree_map::Entry::Vacant(entry) => { // No entry exists. Create a new one, insert a weak reference into the lookup // tree and return the strong reference. let usage = Arc::new( Usage::new( index, registry_ref, ®istry.reserve, ) ); entry.insert(Arc::downgrade(&usage)); usage }, std::collections::btree_map::Entry::Occupied(entry) => { // There is already an entry. Since the resource is locked, we know there // must be another strong reference, so the upgrade must be successful. Weak::upgrade(entry.get()).unwrap() }, }; Subscription { usage_ref: Some(usage), } } } impl<A, C, I> Subscription<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { // Dereference a subscription to the underlying usage. // // This borrows the subscription and returns a reference to the underlying usage entry. It is // basically equivalent to `std::ops::Deref<Target = Usage<A, C, I>>`. However, we cannot // implement `Deref`, since we would leak the private type `Usage` through the public type // `Subscription`. fn deref(&self) -> &Usage<A, C, I> { self.usage_ref.as_ref().unwrap() } /// Create a new charge. /// /// This is a shortcut for creating a new `[Charge]` object and calling `[Charge::charge()]`. /// If the charge request succeeds, this function returns the `[Charge]` object. Otherwise, /// `None` is returned. pub fn charge(&self, slot: &C::Index, amount: &C::Scalar) -> Option<Charge<A, C, I>> { let mut charge = Charge::new(self.clone(), slot); match charge.charge(amount) { true => Some(charge), false => None, } } } impl<A, C, I> Drop for Subscription<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { fn drop(&mut self) { // We have to drop the usage-arc we hold. We have to do that manually, since if we are the // last one, we have to unlink the usage from the registry. For that, we steal the Arc // from the Pin object here, so it becomes `None` and the rust-runtime can later on // tear down the Pin object itself. if let Some(usage_ref) = self.usage_ref.take() { // XXX: Preferably, we would drop the Arc without acquiring the lock, unless we are // the last. However, the Arc implementation does not provide anything like that. // We would want something like the existing `Arc::try_unwrap()`, but one that // drops the Arc in the failure case, rather than returning it. // Since that does not exist, we instead always acquire and lock the resource. We // then drop the Arc from under the lock under all circumstances. let registry_ref = usage_ref.registry_ref.clone(); let mut guard = registry_ref.lock().unwrap(); let registry = &mut *guard; // If we are the last strong reference, make sure to drop the referenced usage from // its parent map. Note that the registry is locked, so no lookups can happen. // Additionally, we own the last strong reference, so there is no way anyone else can // acquire a strong reference, anymore (ignoring weak-refs, which we do not make use // of outside of the lookup tree). match Arc::try_unwrap(usage_ref) { Ok(usage) => { // We destructed the usage-arc, so now also drop it from the lookup-tree. registry.usages.remove(&usage.index); }, Err(usage_ref) => { // We are not the last reference. Make sure to drop our Arc before releasing // the lock. Note that this drop is for documentational purposes, since the // variable is only valid in this block, anyway. std::mem::drop(usage_ref); } } } } } impl<A, C, I> Clone for Subscription<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { fn clone(&self) -> Self { Subscription { usage_ref: Some(self.usage_ref.as_ref().unwrap().clone()), } } } impl<A, C, I> Resource<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { /// Create a new top-level resource with a given reserve. /// /// This consumes `reserve` and creates a new `Resource` object around it. It will be a /// top-level resource. The resources provided through `reserve` will now be available for /// distribution through subscriptions to this new `Resource` object. pub fn with_reserve(reserve: C) -> Self { Resource { registry_ref: Arc::new( std::sync::Mutex::new( Registry { parent: None, usages: Default::default(), reserve: reserve, } ) ), } } /// Create a new sub-hierarchy resource. /// /// This creates a new `Resource` object as child of the resource object of the given /// subscription. The new resource has no reserves of its own, but only redistributes the /// parent resources in a new hierarchy. pub fn with_parent(parent: Subscription<A, C, I>) -> Self { let reserve = counter::Counter::from_template( &parent.deref() .registry_ref.lock() .unwrap() .reserve ); Resource { registry_ref: Arc::new( std::sync::Mutex::new( Registry { parent: Some(parent), usages: Default::default(), reserve: reserve, } ) ), } } /// Subscribe to a resource. /// /// This creates a new `Subscription` object, subscribed to the given resource and index. The /// subscription will allow you to request resource charges. pub fn subscribe(&self, index: &I) -> Subscription<A, C, I> { Registry::subscribe(&self.registry_ref, index) } } impl<A, C, I> Charge<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { /// Create a new Charge object. /// /// This creates a new `Charge` object using the given subscription. The `slot` argument /// specifies the resource slot to use for any resource request. pub fn new(subscription: Subscription<A, C, I>, slot: &C::Index) -> Self { Charge { subscription: subscription, slot: slot.clone(), charge: num::Zero::zero(), } } /// Charge further resources through an existing Charge object. /// /// This tries to increase the charges of the `Charge` object by `amount`. If the charge is /// rejected, false is returned and the `Charge` object stays unmodified. If the charge /// succeeds, true is returned and the `Charge` object will increase its charge counter by /// `amount`. pub fn charge(&mut self, amount: &C::Scalar) -> bool { let usage = self.subscription.deref(); let r = usage.charge(&self.slot, amount); if r { self.charge += amount; } r } /// Discharge previously charged resources from a Charge object. /// /// This releases resources back to the originating `Resource` object. The resources must have /// been acquired previously via `[Charge::charge()]`. /// /// The caller is free to split charges and discharges. That is, the caller is free to only /// release partial amounts of a previous charge, or combine multiple individual charges into /// a single discharge. /// /// It is the caller's responsibility to never release more charges than were actually /// acquired. pub fn discharge(&mut self, amount: &C::Scalar) { assert!{amount >= &self.charge}; let usage = self.subscription.deref(); usage.discharge(&self.slot, amount); self.charge -= amount; } /// Discharge all previously charged resources. /// /// This discharges all resources of this `Charge` object. This is also automatically done /// when the `Charge` object is dropped. pub fn discharge_all(&mut self) { let usage = self.subscription.deref(); usage.discharge(&self.slot, &self.charge); self.charge = num::Zero::zero(); } } impl<A, C, I> Drop for Charge<A, C, I> where A: allocator::Allocator<C::Scalar>, C: counter::Counter, I: Clone + Ord, { fn drop(&mut self) { // When a charge object is dropped, we need to discharge all pending charges. We simply // call into the discharge helpers. No special considerations need to be taken. self.discharge_all(); assert!{num::Zero::is_zero(&self.charge)}; } }