use self::auth::orchestrate_auth;
use crate::client::orchestrator::endpoints::orchestrate_endpoint;
use crate::client::orchestrator::http::read_body;
use crate::client::orchestrator::phase::Phase;
use aws_smithy_http::result::SdkError;
use aws_smithy_runtime_api::client::interceptors::context::{Error, Input, Output};
use aws_smithy_runtime_api::client::interceptors::{InterceptorContext, Interceptors};
use aws_smithy_runtime_api::client::orchestrator::{
BoxError, ConfigBagAccessors, HttpRequest, HttpResponse,
};
use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
use aws_smithy_runtime_api::config_bag::ConfigBag;
use tracing::{debug_span, Instrument};
mod auth;
pub mod endpoints;
mod http;
pub(self) mod phase;
pub async fn invoke(
input: Input,
runtime_plugins: &RuntimePlugins,
) -> Result<Output, SdkError<Error, HttpResponse>> {
let mut cfg = ConfigBag::base();
let cfg = &mut cfg;
let interceptors = Interceptors::new();
cfg.put(interceptors.clone());
let context = Phase::construction(InterceptorContext::new(input))
.include(|_| runtime_plugins.apply_client_configuration(cfg))?
.include(|ctx| interceptors.client_read_before_execution(ctx, cfg))?
.include(|_| runtime_plugins.apply_operation_configuration(cfg))?
.include(|ctx| interceptors.operation_read_before_execution(ctx, cfg))?
.include(|ctx| interceptors.read_before_serialization(ctx, cfg))?
.include_mut(|ctx| interceptors.modify_before_serialization(ctx, cfg))?
.include_mut(|ctx| {
let request_serializer = cfg.request_serializer();
let request = request_serializer
.serialize_input(ctx.take_input().expect("input set at this point"))?;
ctx.set_request(request);
Result::<(), BoxError>::Ok(())
})?
.include(|ctx| interceptors.read_after_serialization(ctx, cfg))?
.include_mut(|ctx| interceptors.modify_before_retry_loop(ctx, cfg))?
.finish();
{
let retry_strategy = cfg.retry_strategy();
match retry_strategy.should_attempt_initial_request(cfg) {
Ok(_) => {}
Err(err) => return Err(Phase::dispatch(context).fail(err)),
}
}
let mut context = context;
let handling_phase = loop {
let dispatch_phase = Phase::dispatch(context);
context = make_an_attempt(dispatch_phase, cfg, &interceptors)
.await?
.include(|ctx| interceptors.read_after_attempt(ctx, cfg))?
.include_mut(|ctx| interceptors.modify_before_attempt_completion(ctx, cfg))?
.finish();
let retry_strategy = cfg.retry_strategy();
match retry_strategy.should_attempt_retry(&context, cfg) {
Ok(true) => continue,
Ok(false) => {}
Err(err) => {
return Err(Phase::response_handling(context).fail(err));
}
}
let handling_phase = Phase::response_handling(context)
.include_mut(|ctx| interceptors.modify_before_completion(ctx, cfg))?;
cfg.trace_probe().dispatch_events();
break handling_phase.include(|ctx| interceptors.read_after_execution(ctx, cfg))?;
};
handling_phase.finalize()
}
async fn make_an_attempt(
dispatch_phase: Phase,
cfg: &mut ConfigBag,
interceptors: &Interceptors<HttpRequest, HttpResponse>,
) -> Result<Phase, SdkError<Error, HttpResponse>> {
let dispatch_phase = dispatch_phase
.include(|ctx| interceptors.read_before_attempt(ctx, cfg))?
.include_mut(|ctx| orchestrate_endpoint(ctx, cfg))?
.include_mut(|ctx| interceptors.modify_before_signing(ctx, cfg))?
.include(|ctx| interceptors.read_before_signing(ctx, cfg))?;
let dispatch_phase = orchestrate_auth(dispatch_phase, cfg).await?;
let mut context = dispatch_phase
.include(|ctx| interceptors.read_after_signing(ctx, cfg))?
.include_mut(|ctx| interceptors.modify_before_transmit(ctx, cfg))?
.include(|ctx| interceptors.read_before_transmit(ctx, cfg))?
.finish();
let call_result = {
let request = context.take_request().expect("request has been set");
let connection = cfg.connection();
connection.call(request).await
};
let mut context = Phase::dispatch(context)
.include_mut(move |ctx| {
ctx.set_response(call_result?);
Result::<(), BoxError>::Ok(())
})?
.include(|ctx| interceptors.read_after_transmit(ctx, cfg))?
.include_mut(|ctx| interceptors.modify_before_deserialization(ctx, cfg))?
.include(|ctx| interceptors.read_before_deserialization(ctx, cfg))?
.finish();
let output_or_error = {
let response = context.response_mut().expect("response has been set");
let response_deserializer = cfg.response_deserializer();
match response_deserializer.deserialize_streaming(response) {
Some(output_or_error) => Ok(output_or_error),
None => read_body(response)
.instrument(debug_span!("read_body"))
.await
.map(|_| response_deserializer.deserialize_nonstreaming(response)),
}
};
Phase::response_handling(context)
.include_mut(move |ctx| {
ctx.set_output_or_error(output_or_error?);
Result::<(), BoxError>::Ok(())
})?
.include(|ctx| interceptors.read_after_deserialization(ctx, cfg))
}