use super::BODY_LIMIT;
use super::{authenticate_endpoint, parse_account_id, Caller};
use crate::{handlers::ConnectionQuery, ServerBackend, ServerState};
use axum::{
body::{to_bytes, Body},
extract::{Extension, OriginalUri, Query},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use axum_extra::{
headers::{authorization::Bearer, Authorization},
typed_header::TypedHeader,
};
use std::sync::Arc;
#[utoipa::path(
head,
path = "/sync/account",
responses(
(
status = StatusCode::OK,
description = "Account exists.",
),
(
status = StatusCode::NOT_FOUND,
description = "Account does not exist.",
),
),
)]
pub(crate) async fn account_exists(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
OriginalUri(uri): OriginalUri,
headers: HeaderMap,
) -> impl IntoResponse {
let uri = uri.path().to_string();
let account_id = parse_account_id(&headers);
match authenticate_endpoint(
account_id,
bearer,
uri.as_bytes(),
None,
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::account_exists(state, backend, caller).await {
Ok(exists) => {
if exists {
StatusCode::OK.into_response()
} else {
StatusCode::NOT_FOUND.into_response()
}
}
Err(error) => error.into_response(),
}
}
Err(error) => error.into_response(),
}
}
#[utoipa::path(
put,
path = "/sync/account",
security(
("bearer_token" = [])
),
request_body(
description = "Protobuf encoded CreateSet",
content_type = "application/octet-stream",
content = Vec<u8>,
),
responses(
(
status = StatusCode::UNAUTHORIZED,
description = "Authorization failed.",
),
(
status = StatusCode::FORBIDDEN,
description = "Account identifier is not allowed on this server.",
),
(
status = StatusCode::CONFLICT,
description = "Account already exists.",
),
(
status = StatusCode::OK,
description = "Account was created.",
),
),
)]
pub(crate) async fn create_account(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
Query(query): Query<ConnectionQuery>,
headers: HeaderMap,
body: Body,
) -> impl IntoResponse {
let account_id = parse_account_id(&headers);
match to_bytes(body, BODY_LIMIT).await {
Ok(bytes) => match authenticate_endpoint(
account_id,
bearer,
&bytes,
Some(query),
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::create_account(state, backend, caller, bytes)
.await
{
Ok(result) => result.into_response(),
Err(error) => error.into_response(),
}
}
Err(error) => error.into_response(),
},
Err(_) => StatusCode::BAD_REQUEST.into_response(),
}
}
#[utoipa::path(
delete,
path = "/sync/account",
responses(
(
status = StatusCode::UNAUTHORIZED,
description = "Authorization failed.",
),
(
status = StatusCode::FORBIDDEN,
description = "Account identifier is not allowed on this server.",
),
(
status = StatusCode::NOT_FOUND,
description = "Account does not exist.",
),
(
status = StatusCode::OK,
description = "Account deleted.",
),
),
)]
pub(crate) async fn delete_account(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
OriginalUri(uri): OriginalUri,
headers: HeaderMap,
) -> impl IntoResponse {
let uri = uri.path().to_string();
let account_id = parse_account_id(&headers);
match authenticate_endpoint(
account_id,
bearer,
uri.as_bytes(),
None,
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::delete_account(state, backend, caller).await {
Ok(result) => result.into_response(),
Err(error) => error.into_response(),
}
}
Err(error) => error.into_response(),
}
}
#[utoipa::path(
post,
path = "/sync/account",
security(
("bearer_token" = [])
),
request_body(
description = "Protobuf encoded CreateSet",
content_type = "application/octet-stream",
content = Vec<u8>,
),
responses(
(
status = StatusCode::UNAUTHORIZED,
description = "Authorization failed.",
),
(
status = StatusCode::FORBIDDEN,
description = "Account identifier is not allowed on this server.",
),
(
status = StatusCode::NOT_FOUND,
description = "Account does not exist.",
),
(
status = StatusCode::OK,
description = "Account was updated.",
),
),
)]
pub(crate) async fn update_account(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
Query(query): Query<ConnectionQuery>,
headers: HeaderMap,
body: Body,
) -> impl IntoResponse {
let account_id = parse_account_id(&headers);
match to_bytes(body, BODY_LIMIT).await {
Ok(bytes) => match authenticate_endpoint(
account_id,
bearer,
&bytes,
Some(query),
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::update_account(state, backend, caller, bytes)
.await
{
Ok(result) => result.into_response(),
Err(error) => error.into_response(),
}
}
Err(error) => error.into_response(),
},
Err(_) => StatusCode::BAD_REQUEST.into_response(),
}
}
#[utoipa::path(
get,
path = "/sync/account",
security(
("bearer_token" = [])
),
responses(
(
status = StatusCode::UNAUTHORIZED,
description = "Authorization failed.",
),
(
status = StatusCode::FORBIDDEN,
description = "Account identifier is not allowed on this server.",
),
(
status = StatusCode::OK,
content_type = "application/octet-stream",
description = "Account data sent as protobuf-encoded CreateSet.",
body = Vec<u8>,
),
),
)]
pub(crate) async fn fetch_account(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
Query(query): Query<ConnectionQuery>,
OriginalUri(uri): OriginalUri,
headers: HeaderMap,
) -> impl IntoResponse {
let uri = uri.path().to_string();
let account_id = parse_account_id(&headers);
match authenticate_endpoint(
account_id,
bearer,
uri.as_bytes(),
Some(query),
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::fetch_account(state, backend, caller).await {
Ok(result) => result.into_response(),
Err(error) => error.into_response(),
}
}
Err(error) => error.into_response(),
}
}
#[utoipa::path(
get,
path = "/sync/account/status",
security(
("bearer_token" = [])
),
responses(
(
status = StatusCode::UNAUTHORIZED,
description = "Authorization failed.",
),
(
status = StatusCode::FORBIDDEN,
description = "Account identifier is not allowed on this server.",
),
(
status = StatusCode::OK,
content_type = "application/octet-stream",
description = "Account sync status sent as protobuf-encoded SyncStatus",
body = Vec<u8>,
),
),
)]
pub(crate) async fn sync_status(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
Query(query): Query<ConnectionQuery>,
OriginalUri(uri): OriginalUri,
headers: HeaderMap,
) -> impl IntoResponse {
let uri = uri.path().to_string();
let account_id = parse_account_id(&headers);
match authenticate_endpoint(
account_id,
bearer,
uri.as_bytes(),
Some(query),
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::sync_status(state, backend, caller).await {
Ok(result) => result.into_response(),
Err(error) => error.into_response(),
}
}
Err(e) => e.into_response(),
}
}
#[utoipa::path(
get,
path = "/sync/account/events",
security(
("bearer_token" = [])
),
request_body(
description = "Protobuf encoded ScanRequest",
content_type = "application/octet-stream",
content = Vec<u8>,
),
responses(
(
status = StatusCode::UNAUTHORIZED,
description = "Authorization failed.",
),
(
status = StatusCode::FORBIDDEN,
description = "Account identifier is not allowed on this server.",
),
(
status = StatusCode::OK,
content_type = "application/octet-stream",
description = "Commit hashes sent as protobuf-encoded ScanResponse.",
body = Vec<u8>,
),
),
)]
pub(crate) async fn event_scan(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
Query(query): Query<ConnectionQuery>,
headers: HeaderMap,
body: Body,
) -> impl IntoResponse {
let account_id = parse_account_id(&headers);
match to_bytes(body, BODY_LIMIT).await {
Ok(bytes) => match authenticate_endpoint(
account_id,
bearer,
&bytes,
Some(query),
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::event_scan(state, backend, caller, bytes)
.await
{
Ok(result) => result.into_response(),
Err(error) => error.into_response(),
}
}
Err(error) => error.into_response(),
},
Err(_) => StatusCode::BAD_REQUEST.into_response(),
}
}
#[utoipa::path(
post,
path = "/sync/account/events",
security(
("bearer_token" = [])
),
request_body(
description = "Protobuf encoded DiffRequest",
content_type = "application/octet-stream",
content = Vec<u8>,
),
responses(
(
status = StatusCode::UNAUTHORIZED,
description = "Authorization failed.",
),
(
status = StatusCode::FORBIDDEN,
description = "Account identifier is not allowed on this server.",
),
(
status = StatusCode::OK,
content_type = "application/octet-stream",
description = "Commit diff sent as protobuf-encoded DiffResponse.",
body = Vec<u8>,
),
),
)]
pub(crate) async fn event_diff(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
Query(query): Query<ConnectionQuery>,
headers: HeaderMap,
body: Body,
) -> impl IntoResponse {
let account_id = parse_account_id(&headers);
match to_bytes(body, BODY_LIMIT).await {
Ok(bytes) => match authenticate_endpoint(
account_id,
bearer,
&bytes,
Some(query),
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::event_diff(state, backend, caller, bytes)
.await
{
Ok(result) => result.into_response(),
Err(error) => error.into_response(),
}
}
Err(error) => error.into_response(),
},
Err(_) => StatusCode::BAD_REQUEST.into_response(),
}
}
#[utoipa::path(
patch,
path = "/sync/account/events",
security(
("bearer_token" = [])
),
request_body(
description = "Protobuf encoded PatchRequest",
content_type = "application/octet-stream",
content = Vec<u8>,
),
responses(
(
status = StatusCode::UNAUTHORIZED,
description = "Authorization failed.",
),
(
status = StatusCode::FORBIDDEN,
description = "Account identifier is not allowed on this server.",
),
(
status = StatusCode::OK,
content_type = "application/octet-stream",
description = "Result of the attempt to apply the checked patch as a protobuf-encoded PatchResponse.",
body = Vec<u8>,
),
),
)]
pub(crate) async fn event_patch(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
Query(query): Query<ConnectionQuery>,
headers: HeaderMap,
body: Body,
) -> impl IntoResponse {
let account_id = parse_account_id(&headers);
match to_bytes(body, BODY_LIMIT).await {
Ok(bytes) => match authenticate_endpoint(
account_id,
bearer,
&bytes,
Some(query),
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::event_patch(state, backend, caller, bytes)
.await
{
Ok(result) => result.into_response(),
Err(error) => error.into_response(),
}
}
Err(error) => error.into_response(),
},
Err(_) => StatusCode::BAD_REQUEST.into_response(),
}
}
#[utoipa::path(
patch,
path = "/sync/account",
security(
("bearer_token" = [])
),
request_body(
description = "Protobuf encoded SyncPacket",
content_type = "application/octet-stream",
content = Vec<u8>,
),
responses(
(
status = StatusCode::UNAUTHORIZED,
description = "Authorization failed.",
),
(
status = StatusCode::FORBIDDEN,
description = "Account identifier is not allowed on this server.",
),
(
status = StatusCode::OK,
content_type = "application/octet-stream",
description = "Account event logs as protobuf-encoded SyncPacket",
body = Vec<u8>,
),
),
)]
pub(crate) async fn sync_account(
Extension(state): Extension<ServerState>,
Extension(backend): Extension<ServerBackend>,
TypedHeader(bearer): TypedHeader<Authorization<Bearer>>,
Query(query): Query<ConnectionQuery>,
headers: HeaderMap,
body: Body,
) -> impl IntoResponse {
let account_id = parse_account_id(&headers);
match to_bytes(body, BODY_LIMIT).await {
Ok(bytes) => match authenticate_endpoint(
account_id,
bearer,
&bytes,
Some(query),
Arc::clone(&state),
Arc::clone(&backend),
)
.await
{
Ok(caller) => {
match handlers::sync_account(state, backend, caller, bytes)
.await
{
Ok(result) => result.into_response(),
Err(error) => error.into_response(),
}
}
Err(error) => error.into_response(),
},
Err(_) => StatusCode::BAD_REQUEST.into_response(),
}
}
mod handlers {
use super::Caller;
use crate::{Error, Result, ServerBackend, ServerState};
use axum::body::Bytes;
use http::{
header::{self, HeaderMap, HeaderValue},
StatusCode,
};
use sos_protocol::{
constants::MIME_TYPE_PROTOBUF, DiffRequest, PatchRequest,
ScanRequest, WireEncodeDecode,
};
use sos_server_storage::server_helpers;
use sos_sync::{CreateSet, SyncPacket, SyncStorage, UpdateSet};
use std::sync::Arc;
#[cfg(feature = "listen")]
use sos_protocol::NetworkChangeEvent;
#[cfg(feature = "listen")]
use crate::handlers::send_notification;
pub(super) async fn account_exists(
_state: ServerState,
backend: ServerBackend,
caller: Caller,
) -> Result<bool> {
let reader = backend.read().await;
reader.account_exists(caller.account_id()).await
}
pub(super) async fn create_account(
_state: ServerState,
backend: ServerBackend,
caller: Caller,
bytes: Bytes,
) -> Result<()> {
{
let reader = backend.read().await;
if reader.account_exists(caller.account_id()).await? {
return Err(Error::Conflict);
}
}
let account = CreateSet::decode(bytes).await?;
let mut writer = backend.write().await;
writer.create_account(caller.account_id(), account).await?;
Ok(())
}
pub(super) async fn delete_account(
_state: ServerState,
backend: ServerBackend,
caller: Caller,
) -> Result<()> {
let mut writer = backend.write().await;
writer.delete_account(caller.account_id()).await?;
Ok(())
}
pub(super) async fn update_account(
_state: ServerState,
backend: ServerBackend,
caller: Caller,
bytes: Bytes,
) -> Result<()> {
let account = UpdateSet::decode(bytes).await?;
let mut writer = backend.write().await;
writer.update_account(caller.account_id(), account).await?;
Ok(())
}
pub(super) async fn fetch_account(
_state: ServerState,
backend: ServerBackend,
caller: Caller,
) -> Result<(HeaderMap, Vec<u8>)> {
let reader = backend.read().await;
let account: CreateSet =
reader.fetch_account(caller.account_id()).await?;
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static(MIME_TYPE_PROTOBUF),
);
Ok((headers, account.encode().await?))
}
pub(super) async fn sync_status(
_state: ServerState,
backend: ServerBackend,
caller: Caller,
) -> Result<(HeaderMap, Vec<u8>)> {
let reader = backend.read().await;
if !reader.account_exists(caller.account_id()).await? {
return Err(Error::Status(StatusCode::NOT_FOUND));
}
let accounts = reader.accounts();
let reader = accounts.read().await;
let account = reader.get(caller.account_id()).unwrap();
let account = account.read().await;
let status = account.sync_status().await?;
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static(MIME_TYPE_PROTOBUF),
);
Ok((headers, status.encode().await?))
}
pub(super) async fn event_scan(
_state: ServerState,
backend: ServerBackend,
caller: Caller,
bytes: Bytes,
) -> Result<(HeaderMap, Vec<u8>)> {
let account = {
let reader = backend.read().await;
let accounts = reader.accounts();
let reader = accounts.read().await;
let account = reader
.get(caller.account_id())
.ok_or_else(|| Error::NoAccount(*caller.account_id()))?;
Arc::clone(account)
};
let req = ScanRequest::decode(bytes).await?;
if req.limit > 256 {
return Err(Error::BadRequest);
}
let reader = account.read().await;
let response =
server_helpers::event_scan::<_, crate::Error>(&req, &*reader)
.await?;
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static(MIME_TYPE_PROTOBUF),
);
Ok((headers, response.encode().await?))
}
pub(super) async fn event_diff(
_state: ServerState,
backend: ServerBackend,
caller: Caller,
bytes: Bytes,
) -> Result<(HeaderMap, Vec<u8>)> {
let account = {
let reader = backend.read().await;
let accounts = reader.accounts();
let reader = accounts.read().await;
let account = reader
.get(caller.account_id())
.ok_or_else(|| Error::NoAccount(*caller.account_id()))?;
Arc::clone(account)
};
let req = DiffRequest::decode(bytes).await?;
let reader = account.read().await;
let response =
server_helpers::event_diff::<_, crate::Error>(&req, &*reader)
.await?;
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static(MIME_TYPE_PROTOBUF),
);
Ok((headers, response.encode().await?))
}
pub(super) async fn event_patch(
state: ServerState,
backend: ServerBackend,
caller: Caller,
bytes: Bytes,
) -> Result<(HeaderMap, Vec<u8>)> {
let account = {
let reader = backend.read().await;
let accounts = reader.accounts();
let reader = accounts.read().await;
let account = reader
.get(caller.account_id())
.ok_or_else(|| Error::NoAccount(*caller.account_id()))?;
Arc::clone(account)
};
let req = PatchRequest::decode(bytes).await?;
let (response, outcome) = {
let mut writer = account.write().await;
server_helpers::event_patch::<_, crate::Error>(req, &mut *writer)
.await?
};
#[cfg(feature = "listen")]
if outcome.changes > 0 {
if let Some(conn_id) = caller.connection_id() {
let reader = account.read().await;
let local_status = reader.sync_status().await?;
let notification = NetworkChangeEvent::new(
caller.account_id(),
conn_id.to_string(),
local_status.root,
outcome,
);
let reader = state.read().await;
send_notification(&*reader, &caller, notification).await;
}
}
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static(MIME_TYPE_PROTOBUF),
);
Ok((headers, response.encode().await?))
}
pub(super) async fn sync_account(
state: ServerState,
backend: ServerBackend,
caller: Caller,
bytes: Bytes,
) -> Result<(HeaderMap, Vec<u8>)> {
let account = {
let reader = backend.read().await;
let accounts = reader.accounts();
let reader = accounts.read().await;
let account = reader
.get(caller.account_id())
.ok_or_else(|| Error::NoAccount(*caller.account_id()))?;
Arc::clone(account)
};
let packet = SyncPacket::decode(bytes).await?;
let (packet, outcome) = {
let mut writer = account.write().await;
server_helpers::sync_account::<_, crate::Error>(
packet,
&mut *writer,
)
.await?
};
#[cfg(feature = "listen")]
if outcome.changes > 0 {
if let Some(conn_id) = caller.connection_id() {
let notification = NetworkChangeEvent::new(
caller.account_id(),
conn_id.to_string(),
packet.status.root,
outcome,
);
let reader = state.read().await;
send_notification(&*reader, &caller, notification).await;
}
}
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static(MIME_TYPE_PROTOBUF),
);
Ok((headers, packet.encode().await?))
}
}