pravega_client/byte/
reader.rs1use crate::client_factory::ClientFactoryAsync;
12use crate::segment::metadata::SegmentMetadataClient;
13use crate::segment::reader::PrefetchingAsyncSegmentReader;
14
15use pravega_client_shared::{ScopedSegment, ScopedStream};
16
17use std::convert::TryInto;
18use std::io::{Error, ErrorKind, SeekFrom};
19use std::sync::Arc;
20use uuid::Uuid;
21
22pub struct ByteReader {
67 reader_id: Uuid,
68 pub segment: ScopedSegment,
69 reader: Option<PrefetchingAsyncSegmentReader>,
70 reader_buffer_size: usize,
71 metadata_client: SegmentMetadataClient,
72 factory: ClientFactoryAsync,
73}
74
75impl ByteReader {
76 pub(crate) async fn new(stream: ScopedStream, factory: ClientFactoryAsync, buffer_size: usize) -> Self {
77 let segments = factory
78 .controller_client()
79 .get_head_segments(&stream)
80 .await
81 .expect("get head segments");
82 assert_eq!(
83 segments.len(),
84 1,
85 "Byte stream is configured with more than one segment"
86 );
87 let segment = segments.iter().next().unwrap().0.clone();
88 let scoped_segment = ScopedSegment {
89 scope: stream.scope.clone(),
90 stream: stream.stream.clone(),
91 segment,
92 };
93 let async_reader = factory.create_async_segment_reader(scoped_segment.clone()).await;
94 let async_reader_wrapper = PrefetchingAsyncSegmentReader::new(
95 factory.runtime_handle(),
96 Arc::new(Box::new(async_reader)),
97 0,
98 buffer_size,
99 );
100 let metadata_client = factory
101 .create_segment_metadata_client(scoped_segment.clone())
102 .await;
103 ByteReader {
104 reader_id: Uuid::new_v4(),
105 segment: scoped_segment,
106 reader: Some(async_reader_wrapper),
107 reader_buffer_size: buffer_size,
108 metadata_client,
109 factory,
110 }
111 }
112
113 pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
121 self.reader
122 .as_mut()
123 .unwrap()
124 .read(buf)
125 .await
126 .map_err(|e| Error::new(ErrorKind::Other, format!("Error: {:?}", e)))
127 }
128
129 pub async fn current_head(&self) -> std::io::Result<u64> {
139 self.metadata_client
140 .fetch_current_starting_head()
141 .await
142 .map(|i| i as u64)
143 .map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))
144 }
145
146 pub async fn current_tail(&self) -> std::io::Result<u64> {
153 self.metadata_client
154 .fetch_current_segment_length()
155 .await
156 .map(|i| i as u64)
157 .map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))
158 }
159
160 pub fn current_offset(&self) -> u64 {
167 self.reader.as_ref().unwrap().offset as u64
168 }
169
170 pub fn available(&self) -> usize {
178 self.reader.as_ref().unwrap().available()
179 }
180
181 pub async fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
185 match pos {
186 SeekFrom::Start(offset) => {
187 let offset = offset.try_into().map_err(|e| {
188 Error::new(
189 ErrorKind::InvalidInput,
190 format!("Overflowed when converting offset to i64: {:?}", e),
191 )
192 })?;
193 self.recreate_reader_wrapper(offset);
194 Ok(offset as u64)
195 }
196 SeekFrom::Current(offset) => {
197 let new_offset = self.reader.as_ref().unwrap().offset + offset;
198 if new_offset < 0 {
199 Err(Error::new(
200 ErrorKind::InvalidInput,
201 "Cannot seek to a negative offset",
202 ))
203 } else {
204 self.recreate_reader_wrapper(new_offset);
205 Ok(new_offset as u64)
206 }
207 }
208 SeekFrom::End(offset) => {
209 let tail = self
210 .metadata_client
211 .fetch_current_segment_length()
212 .await
213 .map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))?;
214 if tail + offset < 0 {
215 Err(Error::new(
216 ErrorKind::InvalidInput,
217 "Cannot seek to a negative offset",
218 ))
219 } else {
220 let new_offset = tail + offset;
221 self.recreate_reader_wrapper(new_offset);
222 Ok(new_offset as u64)
223 }
224 }
225 }
226 }
227
228 fn recreate_reader_wrapper(&mut self, offset: i64) {
229 let internal_reader = self.reader.take().unwrap().extract_reader();
230 let new_reader_wrapper = PrefetchingAsyncSegmentReader::new(
231 self.factory.runtime_handle(),
232 internal_reader,
233 offset,
234 self.reader_buffer_size,
235 );
236 self.reader = Some(new_reader_wrapper);
237 }
238}
239
240#[cfg(test)]
241mod test {
242 use super::*;
243 use crate::byte::writer::ByteWriter;
244 use crate::client_factory::ClientFactory;
245 use crate::util::create_stream;
246 use pravega_client_config::connection_type::{ConnectionType, MockType};
247 use pravega_client_config::ClientConfigBuilder;
248 use pravega_client_shared::PravegaNodeUri;
249 use tokio::runtime::Runtime;
250
251 #[test]
252 fn test_byte_seek() {
253 let (mut writer, mut reader, factory) = create_reader_and_writer(Runtime::new().unwrap());
254 let rt = factory.runtime();
255 let payload = vec![1; 200];
257 rt.block_on(writer.write(&payload)).expect("write");
258 rt.block_on(writer.flush()).expect("flush");
259
260 let mut buf = vec![0; 200];
262 let mut read = 0;
263 while read != 200 {
264 let r = rt.block_on(reader.read(&mut buf)).expect("read");
265 read += r;
266 }
267 assert_eq!(read, 200);
268 assert_eq!(buf, vec![1; 200]);
269 rt.block_on(reader.seek(SeekFrom::Start(0)))
271 .expect("seek to head");
272 assert_eq!(reader.current_offset(), 0);
273
274 rt.block_on(reader.seek(SeekFrom::Start(100)))
276 .expect("seek to head");
277 assert_eq!(reader.current_offset(), 100);
278
279 assert_eq!(reader.current_offset(), 100);
281 rt.block_on(reader.seek(SeekFrom::Current(100)))
282 .expect("seek to current");
283 assert_eq!(reader.current_offset(), 200);
284
285 rt.block_on(reader.seek(SeekFrom::Current(-100)))
287 .expect("seek to current");
288 assert_eq!(reader.current_offset(), 100);
289
290 assert!(rt.block_on(reader.seek(SeekFrom::Current(-200))).is_err());
292
293 rt.block_on(reader.seek(SeekFrom::End(0))).expect("seek to end");
295 assert_eq!(reader.current_offset(), 200);
296
297 assert!(rt.block_on(reader.seek(SeekFrom::End(1))).is_ok());
299
300 rt.block_on(reader.seek(SeekFrom::End(-100)))
302 .expect("seek to end");
303 assert_eq!(reader.current_offset(), 100);
304
305 assert!(rt.block_on(reader.seek(SeekFrom::End(-300))).is_err());
307 }
308
309 #[test]
310 fn test_byte_stream_truncate() {
311 let (mut writer, mut reader, factory) = create_reader_and_writer(Runtime::new().unwrap());
312 let rt = factory.runtime();
313 let payload = vec![1; 200];
315 rt.block_on(writer.write(&payload)).expect("write");
316 rt.block_on(writer.flush()).expect("flush");
317
318 rt.block_on(writer.truncate_data_before(100)).expect("truncate");
320
321 rt.block_on(reader.seek(SeekFrom::Start(0)))
323 .expect("seek to head");
324 let mut buf = vec![0; 100];
325 assert!(rt.block_on(reader.read(&mut buf)).is_err());
326
327 let offset = rt.block_on(reader.current_head()).expect("get current head");
329 rt.block_on(reader.seek(SeekFrom::Start(offset)))
330 .expect("seek to new head");
331 let mut buf = vec![0; 100];
332 assert!(rt.block_on(reader.read(&mut buf)).is_ok());
333 assert_eq!(buf, vec![1; 100]);
334 }
335
336 #[test]
337 fn test_byte_stream_seal() {
338 const BYTE_SIZE: usize = 200;
339
340 let (mut writer, mut reader, factory) = create_reader_and_writer(Runtime::new().unwrap());
341 let rt = factory.runtime();
342
343 let payload = vec![1; BYTE_SIZE];
345 rt.block_on(writer.write(&payload)).expect("write");
346 rt.block_on(writer.flush()).expect("flush");
347
348 rt.block_on(writer.seal()).expect("seal");
350
351 rt.block_on(reader.seek(SeekFrom::Start(0)))
353 .expect("seek to new head");
354 let mut buf = vec![0; BYTE_SIZE];
355 assert!(rt.block_on(reader.read(&mut buf)).is_ok());
356 assert_eq!(buf, vec![1; BYTE_SIZE]);
357
358 let payload = vec![1; BYTE_SIZE];
359 let write_result = rt.block_on(writer.write(&payload));
360 let flush_result = rt.block_on(writer.flush());
361 assert!(write_result.is_err() || flush_result.is_err());
362 }
363
364 #[test]
365 #[should_panic(expected = "Byte stream is configured with more than one segment")]
366 fn test_invalid_stream_config() {
367 let config = ClientConfigBuilder::default()
368 .connection_type(ConnectionType::Mock(MockType::Happy))
369 .mock(true)
370 .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
371 .build()
372 .unwrap();
373 let factory = ClientFactory::new(config);
374 factory.runtime().block_on(create_stream(
375 &factory,
376 "testScopeInvalid",
377 "testStreamInvalid",
378 2,
379 ));
380 let stream = ScopedStream::from("testScopeInvalid/testStreamInvalid");
381 factory.runtime().block_on(factory.create_byte_reader(stream));
382 }
383
384 fn create_reader_and_writer(runtime: Runtime) -> (ByteWriter, ByteReader, ClientFactory) {
385 let config = ClientConfigBuilder::default()
386 .connection_type(ConnectionType::Mock(MockType::Happy))
387 .mock(true)
388 .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
389 .build()
390 .unwrap();
391 let factory = ClientFactory::new_with_runtime(config, runtime);
392 factory
393 .runtime()
394 .block_on(create_stream(&factory, "testScope", "testStream", 1));
395 let stream = ScopedStream::from("testScope/testStream");
396 let writer = factory
397 .runtime()
398 .block_on(factory.create_byte_writer(stream.clone()));
399 let reader = factory.runtime().block_on(factory.create_byte_reader(stream));
400 (writer, reader, factory)
401 }
402}