azure_core 0.30.1

Rust wrappers around Microsoft Azure REST APIs - Core crate
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

use super::policies::ClientRequestIdPolicy;
use crate::{
    error::CheckSuccessOptions,
    http::{
        check_success,
        headers::{RETRY_AFTER_MS, X_MS_RETRY_AFTER_MS},
        policies::{
            Policy, PublicApiInstrumentationPolicy, RequestInstrumentationPolicy, UserAgentPolicy,
        },
        ClientOptions,
    },
};
use std::{
    any::{Any, TypeId},
    sync::Arc,
};
use typespec_client_core::http::{
    self, headers::RETRY_AFTER, policies::RetryHeaders, PipelineOptions,
};

/// Execution pipeline.
///
/// A pipeline follows a precise flow:
///
/// 1. Client library-specified per-call policies are executed. Per-call policies can fail and bail out of the pipeline
///    immediately.
/// 2. User-specified per-call policies in [`ClientOptions::per_call_policies`] are executed.
/// 3. The retry policy is executed. It allows to re-execute the following policies.
/// 4. Client library-specified per-retry policies. Per-retry polices are always executed at least once but are
///    re-executed in case of retries.
/// 5. User-specified per-retry policies in [`ClientOptions::per_try_policies`] are executed.
/// 6. The transport policy is executed. Transport policy is always the last policy and is the policy that
///    actually constructs the [`AsyncRawResponse`](http::AsyncRawResponse) to be passed up the pipeline.
///
/// A pipeline is immutable. In other words a policy can either succeed and call the following
/// policy of fail and return to the calling policy. Arbitrary policy "skip" must be avoided (but
/// cannot be enforced by code). All policies except Transport policy can assume there is another following policy (so
/// `self.pipeline[0]` is always valid).
#[derive(Debug, Clone)]
pub struct Pipeline(http::Pipeline);

/// Options for the [`Pipeline::send`] function.
#[derive(Debug, Default)]
pub struct PipelineSendOptions {
    /// If true, skip all checks including [`check_success`].
    pub skip_checks: bool,

    /// Options for [`check_success`]. If `skip_checks` is true, this field is ignored.
    pub check_success: CheckSuccessOptions,
}

/// Internal structure used to pass options to the core pipeline.
#[derive(Debug, Default)]
struct CorePipelineSendOptions {
    check_success: CheckSuccessOptions,
    skip_checks: bool,
}

impl PipelineSendOptions {
    /// Deconstructs the `PipelineSendOptions` into its core components.
    fn deconstruct(self) -> (CorePipelineSendOptions, Option<http::PipelineSendOptions>) {
        (
            CorePipelineSendOptions {
                skip_checks: self.skip_checks,
                check_success: self.check_success,
            },
            None,
        )
    }
}

/// Options for the [`Pipeline::stream`] function.
#[derive(Debug, Default)]
pub struct PipelineStreamOptions {
    /// If true, skip all checks including [`check_success`].
    pub skip_checks: bool,

    /// Options for [`check_success`]. If `skip_checks` is true, this field is ignored.
    pub check_success: CheckSuccessOptions,
}

/// Internal structure used to pass options to the core pipeline.
#[derive(Debug, Default)]
struct CorePipelineStreamOptions {
    check_success: CheckSuccessOptions,
    skip_checks: bool,
}

impl PipelineStreamOptions {
    /// Deconstructs the `PipelineStreamOptions` into its core components.
    fn deconstruct(
        self,
    ) -> (
        CorePipelineStreamOptions,
        Option<http::PipelineStreamOptions>,
    ) {
        (
            CorePipelineStreamOptions {
                skip_checks: self.skip_checks,
                check_success: self.check_success,
            },
            None,
        )
    }
}

impl Pipeline {
    /// Creates a new pipeline given the client library crate name and version,
    /// alone with user-specified and client library-specified policies.
    ///
    /// Crates can simply pass `option_env!("CARGO_PKG_NAME")` and `option_env!("CARGO_PKG_VERSION")` for the
    /// `crate_name` and `crate_version` arguments respectively.
    ///
    /// # Arguments
    /// * `crate_name` - The name of the crate implementing the client library.
    /// * `crate_version` - The version of the crate implementing the client library.
    /// * `options` - The client options.
    /// * `per_call_policies` - Policies to be executed per call, before the policies in `ClientOptions::per_call_policies`.
    /// * `per_try_policies` - Policies to be executed per try, before the policies in `ClientOptions::per_try_policies`.
    /// * `pipeline_options` - Additional options for the pipeline. If `None`, default options will be used.
    pub fn new(
        crate_name: Option<&'static str>,
        crate_version: Option<&'static str>,
        options: ClientOptions,
        per_call_policies: Vec<Arc<dyn Policy>>,
        per_try_policies: Vec<Arc<dyn Policy>>,
        pipeline_options: Option<PipelineOptions>,
    ) -> Self {
        let (core_client_options, options) = options.deconstruct();

        // Create a fallback tracer if no tracer provider is set.
        // This is useful for service clients that have not yet been instrumented.
        let tracer = core_client_options
            .instrumentation
            .tracer_provider
            .map(|provider| {
                // Note that the choice to use "None" as the namespace here
                // is intentional.
                // The `azure_namespace` parameter is used to populate the `az.namespace`
                // span attribute, however that information is only known by the author of the
                // client library, not the core library.
                // It is also *not* a constant that can be derived from the crate information -
                // it is a value that is determined from the list of resource providers
                // listed [here](https://learn.microsoft.com/azure/azure-resource-manager/management/azure-services-resource-providers).
                //
                // This information can only come from the package owner. It doesn't make sense
                // to burden all users of the azure_core pipeline with determining this
                // information, so we use `None` here.
                provider.get_tracer(None, crate_name.unwrap_or("Unknown"), crate_version)
            });

        let mut per_call_policies = per_call_policies.clone();
        push_unique(&mut per_call_policies, ClientRequestIdPolicy::default());
        if let Some(ref tracer) = tracer {
            let public_api_policy = PublicApiInstrumentationPolicy::new(Some(tracer.clone()));
            push_unique(&mut per_call_policies, public_api_policy);
        }

        let user_agent_policy =
            UserAgentPolicy::new(crate_name, crate_version, &core_client_options.user_agent);
        push_unique(&mut per_call_policies, user_agent_policy);

        let mut per_try_policies = per_try_policies.clone();
        if let Some(ref tracer) = tracer {
            let request_instrumentation_policy =
                RequestInstrumentationPolicy::new(Some(tracer.clone()), &options.logging);
            push_unique(&mut per_try_policies, request_instrumentation_policy);
        }

        let pipeline_options = pipeline_options.unwrap_or_else(|| PipelineOptions {
            retry_headers: RetryHeaders {
                retry_headers: vec![X_MS_RETRY_AFTER_MS, RETRY_AFTER_MS, RETRY_AFTER],
            },
            ..PipelineOptions::default()
        });

        Self(http::Pipeline::new(
            options,
            per_call_policies,
            per_try_policies,
            Some(pipeline_options),
        ))
    }

    /// Sends a [`Request`](http::Request) through each configured [`Policy`] to get a [`RawResponse`](http::RawResponse) that is processed by each policy in reverse.
    ///
    /// # Arguments
    /// * `ctx` - The context for the `Request`.
    /// * `request` - The `Request` to send.
    /// * `options` - Options for sending the `Request`, including check success options. If none, [`check_success`] will not be called.
    ///
    /// # Returns
    ///
    /// A [`http::RawResponse`] if the request was successful, or an `Error` if it failed.
    /// If the response status code indicates an HTTP error, the function will attempt to parse the error response
    /// body into an `ErrorResponse` and include it in the `Error`.
    pub async fn send(
        &self,
        ctx: &http::Context<'_>,
        request: &mut http::Request,
        options: Option<PipelineSendOptions>,
    ) -> crate::Result<http::RawResponse> {
        let (core_send_options, send_options) = options.unwrap_or_default().deconstruct();
        let result = self.0.send(ctx, request, send_options).await?;
        if !core_send_options.skip_checks {
            check_success(result, Some(core_send_options.check_success)).await
        } else {
            Ok(result)
        }
    }

    /// Sends a [`Request`](http::Request) through each configured [`Policy`] to get a [`AsyncRawResponse`](http::AsyncRawResponse) that is processed by each policy in reverse.
    ///
    /// # Arguments
    /// * `ctx` - The context for the `Request`.
    /// * `request` - The `Request` to send.
    /// * `options` - Options for sending the `Request`, including check success options. If none, [`check_success`] will not be called.
    ///
    /// # Returns
    ///
    /// A [`http::RawResponse`] if the request was successful, or an `Error` if it failed.
    /// If the response status code indicates an HTTP error, the function will attempt to parse the error response
    /// body into an `ErrorResponse` and include it in the `Error`.
    pub async fn stream(
        &self,
        ctx: &http::Context<'_>,
        request: &mut http::Request,
        options: Option<PipelineStreamOptions>,
    ) -> crate::Result<http::AsyncRawResponse> {
        let (core_stream_options, stream_options) = options.unwrap_or_default().deconstruct();
        let result = self.0.stream(ctx, request, stream_options).await?;
        if !core_stream_options.skip_checks {
            check_success(result, Some(core_stream_options.check_success)).await
        } else {
            Ok(result)
        }
    }
}

#[inline]
fn push_unique<T: Policy + 'static>(policies: &mut Vec<Arc<dyn Policy>>, policy: T) {
    if policies.iter().all(|p| TypeId::of::<T>() != p.type_id()) {
        policies.push(Arc::new(policy));
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        http::{
            headers::{self, HeaderName, Headers},
            policies::Policy,
            request::options::ClientRequestId,
            AsyncRawResponse, ClientOptions, Context, Method, Request, StatusCode, Transport,
            UserAgentOptions,
        },
        Bytes,
    };
    use azure_core_test::http::MockHttpClient;
    use futures::FutureExt as _;
    use std::sync::Arc;

    #[tokio::test]
    async fn pipeline_with_custom_client_request_id_policy() {
        // Arrange
        const CUSTOM_HEADER_NAME: &str = "x-custom-request-id";
        const CUSTOM_HEADER: HeaderName = HeaderName::from_static(CUSTOM_HEADER_NAME);
        const CLIENT_REQUEST_ID: &str = "custom-request-id";

        let mut ctx = Context::new();
        ctx.insert(ClientRequestId::new(CLIENT_REQUEST_ID.to_string()));

        let transport = Transport::new(Arc::new(MockHttpClient::new(|req| {
            async {
                // Assert
                let header_value = req
                    .headers()
                    .get_optional_str(&CUSTOM_HEADER)
                    .expect("Custom header should be present");
                assert_eq!(
                    header_value, CLIENT_REQUEST_ID,
                    "Custom header value should match the client request ID"
                );

                Ok(AsyncRawResponse::from_bytes(
                    StatusCode::Ok,
                    Headers::new(),
                    Bytes::new(),
                ))
            }
            .boxed()
        })));
        let options = ClientOptions {
            transport: Some(transport),
            ..Default::default()
        };

        let per_call_policies: Vec<Arc<dyn Policy>> =
            vec![
                Arc::new(ClientRequestIdPolicy::with_header_name(CUSTOM_HEADER_NAME))
                    as Arc<dyn Policy>,
            ];
        let per_retry_policies = vec![];

        let pipeline = Pipeline::new(
            Some("test-crate"),
            Some("1.0.0"),
            options,
            per_call_policies,
            per_retry_policies,
            None,
        );

        let mut request = Request::new("https://example.com".parse().unwrap(), Method::Get);

        // Act
        pipeline
            .send(&ctx, &mut request, None)
            .await
            .expect("Pipeline execution failed");
    }

    #[tokio::test]
    async fn pipeline_without_client_request_id_policy() {
        // Arrange
        const CLIENT_REQUEST_ID: &str = "default-request-id";

        let mut ctx = Context::new();
        ctx.insert(ClientRequestId::new(CLIENT_REQUEST_ID.to_string()));

        let transport = Transport::new(Arc::new(MockHttpClient::new(|req| {
            async {
                // Assert
                let header_value = req
                    .headers()
                    .get_optional_str(&headers::CLIENT_REQUEST_ID)
                    .expect("Default header should be present");
                assert_eq!(
                    header_value, CLIENT_REQUEST_ID,
                    "Default header value should match the client request ID"
                );

                Ok(AsyncRawResponse::from_bytes(
                    StatusCode::Ok,
                    Headers::new(),
                    Bytes::new(),
                ))
            }
            .boxed()
        })));
        let options = ClientOptions {
            transport: Some(transport),
            ..Default::default()
        };

        let per_call_policies = vec![]; // No ClientRequestIdPolicy added
        let per_retry_policies = vec![];

        let pipeline = Pipeline::new(
            Some("test-crate"),
            Some("1.0.0"),
            options,
            per_call_policies,
            per_retry_policies,
            None,
        );

        let mut request = Request::new("https://example.com".parse().unwrap(), Method::Get);

        // Act
        pipeline
            .send(&ctx, &mut request, None)
            .await
            .expect("Pipeline execution failed");
    }

    #[tokio::test]
    async fn pipeline_with_user_agent_enabled_default() {
        // Arrange
        let ctx = Context::new();

        let transport = Transport::new(Arc::new(MockHttpClient::new(|req| {
            async {
                // Assert
                let user_agent = req
                    .headers()
                    .get_optional_str(&headers::USER_AGENT)
                    .expect("User-Agent header should be present by default");
                // The default user agent format is: azsdk-rust-<crate_name>/<crate_version> (<rustc_version>; <OS>; <ARCH>)
                // Since we can't know the rustc version at runtime, just check the prefix and crate/version
                assert!(
                    user_agent.starts_with("azsdk-rust-test-crate/1.0.0 "),
                    "User-Agent header should start with expected prefix, got: {}",
                    user_agent
                );

                Ok(AsyncRawResponse::from_bytes(
                    StatusCode::Ok,
                    Headers::new(),
                    Bytes::new(),
                ))
            }
            .boxed()
        })));
        let options = ClientOptions {
            transport: Some(transport),
            ..Default::default()
        };

        let per_call_policies = vec![];
        let per_retry_policies = vec![];

        let pipeline = Pipeline::new(
            Some("test-crate"),
            Some("1.0.0"),
            options,
            per_call_policies,
            per_retry_policies,
            None,
        );

        let mut request = Request::new("https://example.com".parse().unwrap(), Method::Get);

        // Act
        pipeline
            .send(&ctx, &mut request, None)
            .await
            .expect("Pipeline execution failed");
    }

    #[tokio::test]
    async fn pipeline_with_custom_application_id() {
        // Arrange
        const CUSTOM_APPLICATION_ID: &str = "my-custom-app/2.1.0";
        let ctx = Context::new();

        let transport = Transport::new(Arc::new(MockHttpClient::new(|req| {
            async {
                // Assert
                let user_agent = req
                    .headers()
                    .get_optional_str(&headers::USER_AGENT)
                    .expect("User-Agent header should be present");
                // The user agent should contain the custom application_id followed by the standard Azure SDK format
                // Expected format: my-custom-app/2.1.0 azsdk-rust-test-crate/1.0.0 (<rustc_version>; <OS>; <ARCH>)
                assert!(
                    user_agent.starts_with("my-custom-app/2.1.0 azsdk-rust-test-crate/1.0.0 "),
                    "User-Agent header should start with custom application_id and expected prefix, got: {}",
                    user_agent
                );

                Ok(AsyncRawResponse::from_bytes(
                    StatusCode::Ok,
                    Headers::new(),
                    Bytes::new(),
                ))
            }
            .boxed()
        })));

        let user_agent_options = UserAgentOptions {
            application_id: Some(CUSTOM_APPLICATION_ID.to_string()),
        };

        let options = ClientOptions {
            transport: Some(transport),
            user_agent: user_agent_options,
            ..Default::default()
        };

        let per_call_policies = vec![];
        let per_retry_policies = vec![];

        let pipeline = Pipeline::new(
            Some("test-crate"),
            Some("1.0.0"),
            options,
            per_call_policies,
            per_retry_policies,
            None,
        );

        let mut request = Request::new("https://example.com".parse().unwrap(), Method::Get);

        // Act
        pipeline
            .send(&ctx, &mut request, None)
            .await
            .expect("Pipeline execution failed");
    }
}