parenchyma 0.0.33

A high-performance computing (HPC) framework
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
use std::mem;
use std::cell::{Cell, RefCell};
use std::marker::PhantomData;

use {Alloc, ComputeDevice, Device, ErrorKind, Memory, Result, Synch};
use utility::Has;

/// A shared tensor for framework-agnostic, memory-aware, n-dimensional storage. 
///
/// A `SharedTensor` is used for the purpose of tracking the location of memory across devices 
/// for one similar piece of data. `SharedTensor` handles synchronization of memory of type `T`, by 
/// which it is parameterized, and provides the functionality for memory management across devices.
///
/// `SharedTensor` holds copies and their version numbers. A user can request any number of
/// immutable `Tensor`s or a single mutable `Tensor` (enforced by borrowck). It's possible to 
/// validate at runtime that tensor data is initialized when a user requests a tensor for reading
/// and skip the initialization check if a tensor is requested only for writing.
///
/// ## Terminology
///
/// In Parenchyma, multidimensional Rust arrays represent tensors. A vector, a tensor with a 
/// rank of 1, in an n-dimensional space is represented by a one-dimensional Rust array of 
/// length n. Scalars, tensors with a rank of 0, are represented by numbers (e.g., `3`). An array of 
/// arrays, such as `[[1, 2, 3], [4, 5, 6]]`, represents a tensor with a rank of 2.
///
/// A tensor is essentially a generalization of vectors. A Parenchyma shared tensor tracks the memory 
/// copies of the numeric data of a tensor across the device of the backend and manages:
///
/// * the location of these memory copies
/// * the location of the latest memory copy and
/// * the synchronization of memory copies between devices
///
/// This is important, as it provides a unified data interface for executing tensor operations 
/// on CUDA, OpenCL and common host CPU.
///
/// ## Read/Write
///
/// The methods `read`, `read_write`, and `write` use `unsafe` to extend the lifetime of the returned 
/// reference to the internally owned memory chunk. The borrowck guarantees that the shared tensor 
/// outlives all of its tensors, and that there is only one mutable borrow. 
///
/// ### TODO:
///
/// * Therefore, we only need to make sure the memory locations won't be dropped or moved while 
/// there are active tensors.
///
/// * Contexts and devices should also remain in scope, although it's unlikely that a context will
/// have the same ID as a previous context...
///
/// ### Summary
///
/// If the caller reads (`read` or `read_write`), memory is synchronized and the latest memory 
/// object is returned. If the caller mutably borrows memory (`read_write` and `write`), it's expected 
/// that the memory will be overwritten, so the other memory locations are immediately considered 
/// outdated.
#[derive(Debug)]
pub struct SharedTensor<T = f32> {
    /// The shape of the shared tensor.
    pub shape: Shape,

    /// A vector of buffers.
    copies: RefCell<Vec<(ComputeDevice, Memory<T>)>>,

    /// Indicates whether or not memory is synchronized (synchronization state).
    ///
    /// There are only two possible states:
    ///
    /// * Outdated or uninitialized
    /// * Up-to-date
    ///
    /// The _bools_ are packed into an integer and the integer can be set/reset in one operation.
    /// The integer type used is `u64` (used to store bitmasks), therefore the maximum number of 
    /// memories is 64.
    ///
    /// note: `BitSet` can be used instead (for the purpose of having multiple nodes in a cluster?) 
    /// of a single integer in exchange for some runtime cost and will likely be allowed in the 
    /// near future via a parameter at the type level or a feature flag.
    ///
    /// `u64` requires no extra allocations and no access indirection, but is limited. `BitSet` is
    /// slower.
    ///
    /// note: currently relies on the associated constant `u64Map::CAPACITY`, though there are 
    /// plans to add an associated constant or `const fn` to `u64` itself.
    ///
    /// Each time a `Tensor` is mutably borrowed from `SharedTensor`, the version of the 
    /// corresponding memory is _ticked_ or increased. The value `0` means that the memory object 
    /// at that specific location is uninitialized or outdated.
    versions: u64Map,

    /// A marker for `T`.
    phantom: PhantomData<T>,
}

impl<T> SharedTensor<T> where Device: Alloc<T> + Synch<T> {

    /// Constructs a new `SharedTensor` with a shape of `sh`.
    pub fn new<A>(sh: A) -> Self where A: Into<Shape> {

        let shape = sh.into();
        let copies = RefCell::new(vec![]);
        let versions = u64Map::new();

        SharedTensor { shape, copies, versions, phantom: PhantomData }
    }

    /// Constructs a new `SharedTensor` containing a `chunk` of data with a shape of `sh`.
    pub fn with<H, I>(con: &H, sh: I, chunk: Vec<T>) -> Result<Self>
        where H: Has<Device>,
              I: Into<Shape>,
              {

        let shape = sh.into();
        let device = con.get_ref();
        let buffer = device.allocwrite(&shape, chunk)?;
        let copies = RefCell::new(vec![(device.view(), buffer)]);
        let versions = u64Map::with(1);

        Ok(SharedTensor { shape, copies, versions, phantom: PhantomData })
    }

    /// Allocates memory on the active device and tracks it.
    pub fn alloc<H, I>(con: &H, sh: I) -> Result<Self> 
        where H: Has<Device>, 
              I: Into<Shape> 
              {

        let shape = sh.into();
        let device = con.get_ref();
        let buffer = device.alloc(&shape)?;
        let copies = RefCell::new(vec![(device.view(), buffer)]);
        let versions = u64Map::with(1); // ? TODO

        Ok(SharedTensor { shape, copies, versions, phantom: PhantomData })
    }

    /// Drops memory allocation on the specified device. Returns error if no memory has been 
    /// allocated on this device.
    ///
    // TODO FIXME: synchronize memory elsewhere if possible..?
    // TODO silence the error..?
    pub fn dealloc<H>(&mut self, con: &H) -> Result<Memory<T>> where H: Has<Device> {

        let device = con.get_ref();
        let location = device.view();

        match self.get_location_index(&location) {
            Some(i) => {
                let (_, memory) = self.copies.borrow_mut().remove(i);

                let version = self.versions.get();
                let mask = (1 << i) - 1;
                let lower = version & mask;
                let upper = (version >> 1) & (!mask);
                self.versions.set(lower | upper);

                Ok(memory)
            },

            _ => Err(ErrorKind::AllocatedMemoryNotFoundForDevice.into())
        }
    }

    /// Changes the capacity and shape of the tensor.
    ///
    /// **Caution**: Drops all copies which are not on the current device.
    ///
    /// `SharedTensor::reshape` is preferred over this method if the size of the old and new shape
    /// are identical because it will not reallocate memory.
    pub fn realloc<H, I>(&mut self, dev: &H, sh: I) -> Result 
        where H: Has<Device>, 
              I: Into<Shape> 
              {

        unimplemented!()
    }

    /// Change the shape of the Tensor.
    ///
    /// # Returns
    ///
    /// Returns an error if the size of the new shape is not equal to the size of the old shape.
    /// If you want to change the shape to one of a different size, use `SharedTensor::realloc`.
    pub fn reshape<I>(&mut self, sh: I) -> Result where I: Into<Shape> {
        let shape = sh.into();

        if shape.capacity() != self.shape.capacity() {
            return Err(ErrorKind::InvalidReshapedTensorSize.into());
        }

        self.shape = shape;

        Ok(())
    }

    /// Returns the number of elements the tensor can hold without reallocating.
    pub fn capacity(&self) -> usize {
        self.shape.capacity()
    }
}

/// This block contains the read/write/auto-sync logic.
impl<T> SharedTensor<T> where Device: Alloc<T> + Synch<T> {

    /// View an underlying tensor for reading on the active device.
    ///
    /// This method can fail if memory allocation fails or if no memory is initialized.
    /// The borrowck guarantees that the shared tensor outlives all of its tensors.
    ///
    /// Summary:
    ///
    /// 1) Check if there is initialized data anywhere
    /// 2) Lookup memory and its version for `device`, allocate it if it doesn't exist
    /// 3) Check version, if it's old, synchronize
    pub fn read<'shared, H>(&'shared self, dev: &H) -> Result<&'shared Memory<T>> 
        where H: Has<Device> {

        let i = self.autosync(dev, false)?;

        let borrowed_copies = self.copies.borrow();

        let (_, ref buffer) = borrowed_copies[i];

        let memory = unsafe { extend_lifetime::<'shared>(buffer) };

        Ok(memory)
    }

    /// View an underlying tensor for reading and writing on the active device. The memory 
    /// location is set as the latest.
    ///
    /// This method can fail is memory allocation fails or if no memory is initialized.
    ///
    /// Summary:
    ///
    /// 1) Check if there is initialized data anywhere
    /// 2) Lookup memory and its version for `device`, allocate it if it doesn't exist
    /// 3) Check version, if it's old, synchronize
    /// 4) Increase memory version and latest_version
    pub fn read_write<'shared, H>(&'shared mut self, dev: &H) -> Result<&'shared mut Memory<T>> 
        where H: Has<Device> {

        let i = self.autosync(dev, true)?;

        let mut borrowed_copies = self.copies.borrow_mut();

        let (_, ref mut buffer) = borrowed_copies[i];

        let memory = unsafe { extend_lifetime_mut::<'shared>(buffer) };

        Ok(memory)
    }

    /// View an underlying tensor for writing only.
    ///
    /// This method skips synchronization and initialization logic since its data will
    /// be overwritten anyway. The caller must initialize all elements contained in the tensor. This
    /// convention isn't enforced, but failure to do so may result in undefined data later.
    ///
    /// Summary:
    ///
    /// 1) *Skip initialization check
    /// 2) Lookup memory and its version for `device`, allocate it if it doesn't exist
    /// 3) *Skip synchronization
    /// 4) Increase memory version and latest_version
    ///
    /// TODO
    ///
    /// * Add an `invalidate` method:
    ///
    ///     If the caller fails to overwrite memory, it must call `invalidate` to return the vector
    ///     to an uninitialized state.
    pub fn write<'shared, H>(&'shared mut self, con: &H) -> Result<&'shared mut Memory<T>>
        where H: Has<Device> {

        let i = self.get_or_create_location_index(con)?;
        self.versions.set(1 << i);

        let mut borrowed_copies = self.copies.borrow_mut();

        let (_, ref mut buffer) = borrowed_copies[i];

        let memory = unsafe { extend_lifetime_mut::<'shared>(buffer) };

        Ok(memory)
    }
}

impl<T> SharedTensor<T> where Device: Alloc<T> + Synch<T> {

    fn get_location_index(&self, location: &ComputeDevice) -> Option<usize> {

        for (i, l) in self.copies.borrow().iter().map(|&(ref l, _)| l).enumerate() {
            if l.eq(location) {
                return Some(i);
            }
        }

        None
    }

    fn get_or_create_location_index<H>(&self, con: &H) -> Result<usize> where H: Has<Device> {

        let device = con.get_ref();

        let location = device.view();

        if let Some(i) = self.get_location_index(&location) {
            return Ok(i);
        }

        if self.copies.borrow().len() == u64Map::CAPACITY {
            return Err(ErrorKind::BitMapCapacityExceeded.into());
        }

        let memory = device.alloc(&self.shape)?;
        self.copies.borrow_mut().push((location, memory));

        Ok(self.copies.borrow().len() - 1)
    }

    /// Sync if necessary
    ///
    /// TODO: 
    ///
    /// * Choose the best source to copy data from.
    ///      That would require some additional traits that return costs for transferring data 
    ///      between different backends.
    ///
    /// note: Typically, there would be transfers between `Native` <-> `GPU` in foreseeable 
    /// future, so it's best to not over-engineer here.
    pub fn autosync<H>(&self, dev: &H, tick: bool) -> Result<usize> where H: Has<Device> {
        if self.versions.empty() {
            return Err(ErrorKind::UninitializedMemory.into());
        }

        let i = self.get_or_create_location_index(dev)?;
        self.autosync_(i)?;

        if tick {
            self.versions.set(1 << i);
        } else {
            self.versions.insert(i);
        }

        Ok(i)
    }

    fn autosync_(&self, destination_index: usize) -> Result {

        if self.versions.contains(destination_index) {

            return Ok(());
        }

        let source_index = self.versions.latest() as usize;
        assert_ne!(source_index, u64Map::CAPACITY);

        // We need to borrow two different Vec elements: `src` and `mut dst`.
        // Borrowck doesn't allow to do it in a straightforward way, so here is workaround.

        assert_ne!(source_index, destination_index);

        let mut borrowed_copies = self.copies.borrow_mut();

        let (source, mut destination) = {
            if source_index < destination_index {
                let (left, right) = borrowed_copies.split_at_mut(destination_index);
                (&left[source_index], &mut right[0])
            } else {
                let (left, right) = borrowed_copies.split_at_mut(source_index);
                (&right[0], &mut left[destination_index])
            }
        };

        // TODO refactor

        // Backends may define transfers asymmetrically. E.g. CUDA may know how to transfer to and 
        // from Native backend, while Native may know nothing about CUDA at all. So if first 
        // attempt fails we change order and try again.
        match source.0.device().read(&source.1, &mut destination.0, &mut destination.1) {
            Err(ref e) if e.kind() == ErrorKind::NoAvailableSynchronizationRouteFound => { },
            ret @ _ => return ret,
        }

        destination.0.device().write(&mut destination.1, &source.0, &source.1)

        // TODO: try transfer indirectly via Native backend
    }
}

/// Describes the shape of a tensor.
#[derive(Clone, Debug)]
pub struct Shape {
    /// The number of components.
    ///
    /// # Example
    ///
    /// ```{.text}
    /// // The following tensor has 9 components
    ///
    /// [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
    /// ```
    pub capacity: usize,
    /// The total number of indices.
    ///
    /// # Example
    ///
    /// The following tensor has a rank of 2:
    ///
    /// ```{.text}
    /// [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
    /// ```
    rank: usize,
    /// The dimensions of the tensor.
    pub dims: Vec<usize>,
}

impl Shape {

    /// Returns the capacity
    pub fn capacity(&self) -> usize {

        self.capacity
    }
}

impl From<usize> for Shape {

    fn from(n: usize) -> Shape {
        [n].into()
    }
}

impl From<[usize; 1]> for Shape {

    fn from(array: [usize; 1]) -> Shape {
        let capacity = array[0];
        let rank = 1;
        let dims = array.to_vec();

        Shape { capacity, rank, dims }
    }
}

impl From<[usize; 2]> for Shape {

    fn from(array: [usize; 2]) -> Shape {
        let capacity = array.iter().fold(1, |acc, &dims| acc * dims);
        let rank = 2;
        let dims = array.to_vec();

        Shape { capacity, rank, dims }
    }
}

impl From<[usize; 3]> for Shape {

    fn from(array: [usize; 3]) -> Shape {
        let capacity = array.iter().fold(1, |acc, &dims| acc * dims);
        let rank = 3;
        let dims = array.to_vec();

        Shape { capacity, rank, dims }
    }
}

/// A "newtype" with an internal type of `Cell<u64>`. `u64Map` uses [bit manipulation][1] to manage 
/// memory versions.
///
/// [1]: http://stackoverflow.com/a/141873/2561805
#[allow(non_camel_case_types)]
#[derive(Debug)]
pub struct u64Map(Cell<u64>);

impl u64Map {
    /// The maximum number of bits in the bit map can contain.
    const CAPACITY: usize = 64;

    /// Constructs a new `u64Map`.
    fn new() -> u64Map {
        u64Map::with(0)
    }

    /// Constructs a new `u64Map` with the supplied `n`.
    fn with(n: u64) -> u64Map {
        u64Map(Cell::new(n))
    }

    fn get(&self) -> u64 {
        self.0.get()
    }

    fn set(&self, v: u64) {
        self.0.set(v)
    }

    fn empty(&self) -> bool {
        self.0.get() == 0
    }

    fn insert(&self, k: usize) {
        self.0.set(self.0.get() | (1 << k))
    }

    fn contains(&self, k: usize) -> bool {
        k < Self::CAPACITY && (self.0.get() & (1 << k) != 0)
    }

    fn latest(&self) -> u32 {
        self.0.get().trailing_zeros()
    }
}

unsafe fn extend_lifetime<'a, 'b, T>(t: &'a T) -> &'b T {
    mem::transmute::<&'a T, &'b T>(t)
}

unsafe fn extend_lifetime_mut<'a, 'b, T>(t: &'a mut T) -> &'b mut T {
    mem::transmute::<&'a mut T, &'b mut T>(t)
}