1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use super::OpError;
use super::OpResult;
use crate::ctx::Ctx;
use crate::incomplete_token::IncompleteToken;
use crate::object::calc_object_layout;
use crate::object::ObjectState;
use crate::util::ceil_pow2;
use crate::util::div_pow2;
use crate::util::is_multiple_of_pow2;
use futures::Stream;
use futures::StreamExt;
use itertools::Itertools;
use off64::u32;
use off64::u64;
use off64::usz;
use std::cmp::max;
use std::cmp::min;
use std::error::Error;
use std::iter::empty;
use std::sync::Arc;
use tracing::trace;
use tracing::warn;

pub struct OpWriteObjectInput<
  D: AsRef<[u8]>,
  S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
> {
  pub incomplete_token: IncompleteToken,
  pub offset: u64,
  pub data_len: u64,
  pub data_stream: S,
}

pub struct OpWriteObjectOutput {}

pub(crate) async fn op_write_object<
  D: AsRef<[u8]>,
  S: Unpin + Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
>(
  ctx: Arc<Ctx>,
  mut req: OpWriteObjectInput<D, S>,
) -> OpResult<OpWriteObjectOutput> {
  let len = req.data_len;
  let IncompleteToken { object_id, .. } = req.incomplete_token;

  let Some(obj) = ctx.incomplete_objects.read().get(&object_id).cloned() else {
    return Err(OpError::ObjectNotFound);
  };

  if !is_multiple_of_pow2(req.offset, ctx.pages.lpage_size_pow2) {
    // Invalid offset.
    return Err(OpError::UnalignedWrite);
  };
  if len > ctx.pages.lpage_size() {
    // Cannot write greater than one lpage size in one request.
    return Err(OpError::InexactWriteLength);
  };

  if req.offset + len > obj.size {
    // Offset is past size.
    return Err(OpError::RangeOutOfBounds);
  };

  if req.offset + len != min(req.offset + ctx.pages.lpage_size(), obj.size) {
    // Write does not fully fill lpage or entire tail. All writes must fill as otherwise uninitialised data will get exposed.
    return Err(OpError::InexactWriteLength);
  };

  let layout = calc_object_layout(&ctx.pages, obj.size);

  // Vec of (page_size, page_dev_offset).
  let write_dev_offsets = {
    let idx = u32!(div_pow2(req.offset, ctx.pages.lpage_size_pow2));
    if idx < layout.lpage_count {
      vec![(ctx.pages.lpage_size(), obj.lpage_dev_offsets[usz!(idx)])]
    } else {
      let mut offsets = layout
        .tail_page_sizes_pow2
        .into_iter()
        .map(|(i, sz)| (1 << sz, obj.tail_page_dev_offsets[usz!(i)]))
        .collect_vec();
      // For the very last tail page, we don't write a full page amount of bytes, unless the object just happens to be a multiple of that page's size. Use `.map` as there may not even be any tail pages at all.
      offsets.last_mut().map(|(page_size, _page_dev_offset)| {
        let mod_ = obj.size % *page_size;
        if mod_ != 0 {
          *page_size = mod_;
        };
      });
      offsets
    }
  };

  let mut written = 0;
  let mut write_page_idx = 0;
  let mut buf = Vec::new();
  loop {
    let maybe_chunk = req.data_stream.next().await;
    if let Some(chunk) = maybe_chunk {
      buf.extend_from_slice(
        chunk
          .map_err(|err| OpError::DataStreamError(Box::from(err)))?
          .as_ref(),
      );
    } else if buf.is_empty() {
      // Stream has ended and buffer has been fully consumed.
      break;
    };
    let buf_len = u64!(buf.len());
    if written + buf_len > len {
      warn!(
        received = written + buf_len,
        declared = len,
        "stream provided more data than declared"
      );
      return Err(OpError::DataStreamLengthMismatch);
    };

    // TODO We could write more frequently instead of buffering an entire page if the page is larger than one SSD page/block write. However, if it's smaller, the I/O system (e.g. mmap) would be doing buffering and repeated writes anyway.
    let (amount_to_write, page_dev_offset) = write_dev_offsets[write_page_idx];
    if buf_len < amount_to_write {
      continue;
    };
    {
      let lock = obj
        .lock_for_writing_if_still_valid(ObjectState::Incomplete)
        .await?;
      // Optimisation: fdatasync at end of all writes instead of here.
      // We can't use `allocate_from_data` as it won't be sized correctly.
      let mut write_data = ctx.pages.allocate_uninitialised(max(
        ctx.pages.spage_size(),
        ceil_pow2(amount_to_write, ctx.pages.spage_size_pow2),
      ));
      write_data[..usz!(amount_to_write)].copy_from_slice(&buf[..usz!(amount_to_write)]);
      buf.splice(..usz!(amount_to_write), empty());
      ctx.device.write_at(page_dev_offset, write_data).await;
      trace!(
        object_id,
        previously_written = written,
        page_write_amount = amount_to_write,
        "wrote page"
      );
      written += amount_to_write;
      write_page_idx += 1;
      drop(lock);
    };
  }
  if written != len {
    warn!(
      received = written,
      declared = len,
      "stream provided fewer data than declared"
    );
    return Err(OpError::DataStreamLengthMismatch);
  };

  ctx.device.sync().await;

  Ok(OpWriteObjectOutput {})
}