1#![allow(bare_trait_objects)]
11
12use crate::byte::reader::ByteReader;
17use crate::byte::writer::ByteWriter;
18use crate::event::reader_group::{ReaderGroup, ReaderGroupConfig, ReaderGroupConfigBuilder};
19use crate::event::transactional_writer::TransactionalEventWriter;
20use crate::event::writer::EventWriter;
21use crate::segment::metadata::SegmentMetadataClient;
22use crate::segment::raw_client::RawClientImpl;
23use crate::segment::reader::AsyncSegmentReaderImpl;
24use crate::sync::synchronizer::Synchronizer;
25use crate::sync::table::{Table, TableError};
26cfg_if::cfg_if! {
27 if #[cfg(feature = "integration-test")] {
28 use crate::test_utils::{RawClientWrapper, SegmentReaderWrapper};
29 }
30}
31
32use crate::index::Fields;
33use pravega_client_auth::DelegationTokenProvider;
34use pravega_client_config::ClientConfig;
35use pravega_client_shared::{DelegationToken, PravegaNodeUri, Scope, ScopedSegment, ScopedStream, WriterId};
36use pravega_connection_pool::connection_pool::ConnectionPool;
37use pravega_controller_client::mock_controller::MockController;
38use pravega_controller_client::{ControllerClient, ControllerClientImpl};
39use pravega_wire_protocol::connection_factory::{
40 ConnectionFactory, ConnectionFactoryConfig, SegmentConnectionManager,
41};
42
43use crate::index::{IndexReader, IndexWriter};
44use crate::util::meta::MetaClient;
45use std::fmt;
46use std::fmt::Debug;
47use std::sync::Arc;
48use tokio::runtime::{Handle, Runtime};
49use tracing::info;
50
51pub struct ClientFactory {
91 runtime: Runtime,
92 client_factory_async: ClientFactoryAsync,
93}
94
95impl ClientFactory {
96 pub fn new(config: ClientConfig) -> ClientFactory {
97 let rt = tokio::runtime::Runtime::new().expect("create runtime");
98 ClientFactory::new_with_runtime(config, rt)
99 }
100
101 pub fn new_with_runtime(config: ClientConfig, rt: Runtime) -> ClientFactory {
102 let async_factory = ClientFactoryAsync::new(config, rt.handle().clone());
103 ClientFactory {
104 runtime: rt,
105 client_factory_async: async_factory,
106 }
107 }
108
109 pub fn runtime(&self) -> &Runtime {
110 &self.runtime
111 }
112
113 pub fn runtime_handle(&self) -> Handle {
114 self.runtime.handle().clone()
115 }
116
117 pub fn config(&self) -> &ClientConfig {
118 self.client_factory_async.config()
119 }
120
121 pub fn controller_client(&self) -> &dyn ControllerClient {
122 self.client_factory_async.controller_client()
123 }
124
125 pub fn create_event_writer(&self, stream: ScopedStream) -> EventWriter {
126 self.client_factory_async.create_event_writer(stream)
127 }
128
129 pub async fn create_reader_group(&self, reader_group_name: String, stream: ScopedStream) -> ReaderGroup {
130 info!(
131 "Creating reader group {:?} to read data from stream {:?}",
132 reader_group_name, stream
133 );
134 self.client_factory_async
135 .create_reader_group(reader_group_name, stream)
136 .await
137 }
138
139 pub async fn create_reader_group_with_config(
143 &self,
144 reader_group_name: String,
145 reader_group_config: ReaderGroupConfig,
146 scope: Scope,
147 ) -> ReaderGroup {
148 info!(
149 "Creating reader group {:?} to read data from streams {:?}",
150 reader_group_name,
151 reader_group_config.get_streams()
152 );
153 self.client_factory_async
154 .create_reader_group_with_config(scope, reader_group_name, reader_group_config)
155 .await
156 }
157
158 pub async fn delete_reader_group(
162 &self,
163 scope: Scope,
164 reader_group_name: String,
165 ) -> Result<(), TableError> {
166 info!(
167 "Deleting reader group {:?} under scope {:?}",
168 reader_group_name, scope
169 );
170 self.client_factory_async
171 .delete_reader_group(scope, reader_group_name)
172 .await
173 }
174
175 pub async fn create_transactional_event_writer(
176 &self,
177 stream: ScopedStream,
178 writer_id: WriterId,
179 ) -> TransactionalEventWriter {
180 self.client_factory_async
181 .create_transactional_event_writer(stream, writer_id)
182 .await
183 }
184
185 pub async fn create_byte_writer(&self, stream: ScopedStream) -> ByteWriter {
186 self.client_factory_async.create_byte_writer(stream).await
187 }
188
189 pub async fn create_byte_reader(&self, stream: ScopedStream) -> ByteReader {
190 self.client_factory_async.create_byte_reader(stream).await
191 }
192
193 pub async fn create_index_writer<T: Fields + PartialOrd + PartialEq + Debug>(
194 &self,
195 stream: ScopedStream,
196 ) -> IndexWriter<T> {
197 self.client_factory_async.create_index_writer(stream).await
198 }
199
200 pub async fn create_index_reader(&self, stream: ScopedStream) -> IndexReader {
201 self.client_factory_async.create_index_reader(stream).await
202 }
203
204 pub async fn create_table(&self, scope: Scope, name: String) -> Table {
205 self.client_factory_async.create_table(scope, name).await
206 }
207
208 pub async fn create_synchronizer(&self, scope: Scope, name: String) -> Synchronizer {
209 self.client_factory_async.create_synchronizer(scope, name).await
210 }
211
212 pub fn to_async(&self) -> ClientFactoryAsync {
213 self.client_factory_async.clone()
214 }
215
216 pub(crate) async fn create_async_segment_reader(&self, segment: ScopedSegment) -> AsyncSegmentReaderImpl {
217 self.client_factory_async
218 .create_async_segment_reader(segment)
219 .await
220 }
221
222 pub(crate) async fn create_raw_client(&self, segment: &ScopedSegment) -> RawClientImpl<'_> {
223 self.client_factory_async.create_raw_client(segment).await
224 }
225
226 pub(crate) fn create_raw_client_for_endpoint(&self, endpoint: PravegaNodeUri) -> RawClientImpl<'_> {
227 self.client_factory_async.create_raw_client_for_endpoint(endpoint)
228 }
229
230 pub(crate) async fn create_delegation_token_provider(
231 &self,
232 stream: ScopedStream,
233 ) -> DelegationTokenProvider {
234 self.client_factory_async
235 .create_delegation_token_provider(stream)
236 .await
237 }
238
239 pub(crate) async fn create_segment_metadata_client(
240 &self,
241 segment: ScopedSegment,
242 ) -> SegmentMetadataClient {
243 self.client_factory_async
244 .create_segment_metadata_client(segment)
245 .await
246 }
247
248 #[doc(hidden)]
249 #[cfg(feature = "integration-test")]
250 pub async fn create_raw_client_wrapper(&self, segment: &ScopedSegment) -> RawClientWrapper<'_> {
251 let endpoint = self
252 .client_factory_async
253 .controller_client
254 .get_endpoint_for_segment(segment)
255 .await
256 .expect("get endpoint for segment");
257 RawClientWrapper::new(
258 &self.client_factory_async.connection_pool,
259 endpoint,
260 self.client_factory_async.config.request_timeout,
261 )
262 }
263
264 #[doc(hidden)]
265 #[cfg(feature = "integration-test")]
266 pub async fn create_segment_reader_wrapper(&self, segment: ScopedSegment) -> SegmentReaderWrapper {
267 SegmentReaderWrapper::new(
268 segment.clone(),
269 self.to_async(),
270 self.client_factory_async
271 .create_delegation_token_provider(ScopedStream::from(&segment))
272 .await,
273 )
274 .await
275 }
276}
277
278#[derive(Clone)]
279pub struct ClientFactoryAsync {
280 connection_pool: Arc<ConnectionPool<SegmentConnectionManager>>,
281 controller_client: Arc<Box<dyn ControllerClient>>,
282 config: Arc<ClientConfig>,
283 runtime_handle: Handle,
284}
285
286impl ClientFactoryAsync {
287 pub fn new(config: ClientConfig, handle: Handle) -> Self {
288 let cf = ConnectionFactory::create(ConnectionFactoryConfig::from(&config));
289 let pool = ConnectionPool::new(SegmentConnectionManager::new(cf, config.max_connections_in_pool));
290 let controller = if config.mock {
291 Box::new(MockController::new(config.controller_uri.clone())) as Box<dyn ControllerClient>
292 } else {
293 Box::new(ControllerClientImpl::new(config.clone(), &handle)) as Box<dyn ControllerClient>
294 };
295 ClientFactoryAsync {
296 connection_pool: Arc::new(pool),
297 controller_client: Arc::new(controller),
298 config: Arc::new(config),
299 runtime_handle: handle,
300 }
301 }
302 pub fn config(&self) -> &ClientConfig {
303 &self.config
304 }
305
306 pub fn create_event_writer(&self, stream: ScopedStream) -> EventWriter {
307 EventWriter::new(stream, self.clone())
308 }
309
310 pub async fn create_stream_meta_client(&self, stream: ScopedStream) -> MetaClient {
311 MetaClient::new(stream, self.clone())
312 }
313
314 pub async fn create_reader_group(&self, reader_group_name: String, stream: ScopedStream) -> ReaderGroup {
319 let scope = stream.scope.clone();
320 let rg_config = ReaderGroupConfigBuilder::default().add_stream(stream).build();
321 ReaderGroup::create(scope, reader_group_name, rg_config, self.clone()).await
322 }
323
324 pub async fn create_reader_group_with_config(
328 &self,
329 scope: Scope,
330 reader_group_name: String,
331 rg_config: ReaderGroupConfig,
332 ) -> ReaderGroup {
333 ReaderGroup::create(scope, reader_group_name, rg_config, self.clone()).await
334 }
335
336 pub async fn delete_reader_group(
340 &self,
341 scope: Scope,
342 reader_group_name: String,
343 ) -> Result<(), TableError> {
344 ReaderGroup::delete(scope, reader_group_name, self.clone()).await
345 }
346
347 pub async fn create_transactional_event_writer(
348 &self,
349 stream: ScopedStream,
350 writer_id: WriterId,
351 ) -> TransactionalEventWriter {
352 TransactionalEventWriter::new(stream, writer_id, self.clone()).await
353 }
354
355 pub async fn create_byte_writer(&self, stream: ScopedStream) -> ByteWriter {
356 ByteWriter::new(stream, self.clone()).await
357 }
358
359 pub async fn create_byte_reader(&self, stream: ScopedStream) -> ByteReader {
360 ByteReader::new(stream, self.clone(), self.config().reader_wrapper_buffer_size()).await
361 }
362
363 pub async fn create_index_writer<T: Fields + PartialOrd + PartialEq + Debug>(
364 &self,
365 stream: ScopedStream,
366 ) -> IndexWriter<T> {
367 IndexWriter::new(self.clone(), stream).await
368 }
369
370 pub async fn create_index_reader(&self, stream: ScopedStream) -> IndexReader {
371 IndexReader::new(self.clone(), stream).await
372 }
373
374 pub async fn create_table(&self, scope: Scope, name: String) -> Table {
375 Table::new(scope, name, self.clone())
376 .await
377 .expect("Failed to create Table map")
378 }
379
380 pub async fn create_synchronizer(&self, scope: Scope, name: String) -> Synchronizer {
381 Synchronizer::new(scope, name, self.clone()).await
382 }
383
384 pub fn controller_client(&self) -> &dyn ControllerClient {
385 &**self.controller_client
386 }
387
388 pub fn runtime_handle(&self) -> Handle {
389 self.runtime_handle.clone()
390 }
391
392 pub(crate) async fn create_async_segment_reader(&self, segment: ScopedSegment) -> AsyncSegmentReaderImpl {
393 AsyncSegmentReaderImpl::new(
394 segment.clone(),
395 self.clone(),
396 self.create_delegation_token_provider(ScopedStream::from(&segment))
397 .await,
398 )
399 .await
400 }
401
402 pub(crate) async fn create_raw_client(&self, segment: &ScopedSegment) -> RawClientImpl<'_> {
403 let endpoint = self
404 .controller_client
405 .get_endpoint_for_segment(segment)
406 .await
407 .expect("get endpoint for segment");
408 RawClientImpl::new(&self.connection_pool, endpoint, self.config.request_timeout)
409 }
410
411 pub(crate) fn create_raw_client_for_endpoint(&self, endpoint: PravegaNodeUri) -> RawClientImpl<'_> {
412 RawClientImpl::new(&self.connection_pool, endpoint, self.config.request_timeout)
413 }
414
415 pub(crate) async fn create_segment_metadata_client(
416 &self,
417 segment: ScopedSegment,
418 ) -> SegmentMetadataClient {
419 SegmentMetadataClient::new(segment, self.clone()).await
420 }
421
422 pub(crate) async fn create_delegation_token_provider(
423 &self,
424 stream: ScopedStream,
425 ) -> DelegationTokenProvider {
426 let token_provider = DelegationTokenProvider::new(stream);
427 if !self.config.is_auth_enabled {
428 let empty_token = DelegationToken::new("".to_string(), None);
429 token_provider.populate(empty_token).await;
430 }
431 token_provider
432 }
433
434 pub(crate) fn get_connection_pool(&self) -> &ConnectionPool<SegmentConnectionManager> {
435 &self.connection_pool
436 }
437}
438
439impl fmt::Debug for ClientFactoryAsync {
440 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441 f.debug_struct("ClientFactoryInternal")
442 .field("connection pool", &self.connection_pool)
443 .field("client config,", &self.config)
444 .finish()
445 }
446}