fast-cache 0.1.0

Embedded-first thread-per-core in-memory cache with optional Redis-compatible server
Documentation
use super::*;

pub(super) struct HandoffConfig;

impl HandoffConfig {
    pub(super) fn buffer() -> HandoffBufferConfig {
        HandoffBufferConfig::new(HANDOFF_BUFFER_MAX).with_read_reserve(READ_CHUNK_SIZE)
    }

    pub(super) fn write() -> WriteHandoffConfig {
        WriteHandoffConfig::new(WRITE_HANDOFF_MAX_ITEMS, WRITE_HANDOFF_MAX_PENDING_BYTES)
    }
}

pub(super) struct ConnectionRejector;

impl ConnectionRejector {
    pub(super) async fn reject<S>(mut stream: S) -> Result<()>
    where
        S: AsyncWrite + Unpin,
    {
        let mut bytes = Vec::new();
        RespCodec::encode(
            &Frame::Error("ERR max connections reached".into()),
            &mut bytes,
        );
        stream.write_all(&bytes).await?;
        let _ = stream.shutdown().await;
        Ok(())
    }
}

pub(super) struct EngineConnection;

impl EngineConnection {
    pub(super) async fn handle<R>(
        mut read_half: R,
        write_handoff: WriteHandoff,
        engine: EngineHandle,
        _permit: OwnedSemaphorePermit,
    ) -> Result<()>
    where
        R: AsyncRead + Unpin,
    {
        let mut frame_buffer = HandoffBuffer::with_config(HandoffConfig::buffer());

        loop {
            let read = frame_buffer
                .read_available(&mut read_half)
                .await
                .map_err(|error| {
                    crate::FastCacheError::Protocol(format!("handoff read error: {error}"))
                })?;
            if read == 0 {
                return Ok(());
            }

            let mut write_buffer = Vec::with_capacity(CONNECTION_BUFFER_CAPACITY);
            let mut consumed_total = 0usize;

            while let Some(first_byte) = frame_buffer.peek().get(consumed_total).copied() {
                if FastCodec::is_fast_request_prefix(first_byte) {
                    let slice = &frame_buffer.peek()[consumed_total..];
                    let decoded = FastCodec::decode_request(slice)?;
                    let Some((request, consumed)) = decoded else {
                        break;
                    };
                    consumed_total += consumed;
                    let response = match engine.execute_fast(request).await {
                        Ok(response) => response,
                        Err(error) => FastResponse::Error(format!("ERR {error}").into_bytes()),
                    };
                    FastCodec::encode_response(&response, &mut write_buffer);
                } else {
                    let spanned_consumed = {
                        let slice = &frame_buffer.peek()[consumed_total..];
                        let decoded = RespCodec::decode_command(slice)?;
                        let Some((frame, consumed)) = decoded else {
                            break;
                        };

                        match BorrowedCommand::from_frame(frame) {
                            Ok(command) => {
                                if EngineHandle::should_use_spanned_resp(&command) {
                                    Some(consumed)
                                } else {
                                    consumed_total += consumed;
                                    if let Err(error) = engine
                                        .execute_resp_borrowed_into(command, &mut write_buffer)
                                        .await
                                    {
                                        RespCodec::encode(
                                            &Frame::Error(format!("ERR {error}")),
                                            &mut write_buffer,
                                        );
                                    }
                                    None
                                }
                            }
                            Err(error) => {
                                consumed_total += consumed;
                                RespCodec::encode(
                                    &Frame::Error(format!("ERR {error}")),
                                    &mut write_buffer,
                                );
                                None
                            }
                        }
                    };

                    if let Some(consumed) = spanned_consumed {
                        if consumed_total > 0 {
                            frame_buffer.advance(consumed_total).map_err(|error| {
                                crate::FastCacheError::Protocol(format!(
                                    "handoff advance error: {error}"
                                ))
                            })?;
                            consumed_total = 0;
                            continue;
                        }
                        let Some((span_frame, span_consumed)) =
                            RespCodec::decode_command_spans(frame_buffer.peek())?
                        else {
                            break;
                        };
                        if span_consumed != consumed {
                            return Err(crate::FastCacheError::Protocol(
                                "command span length changed during handoff".into(),
                            ));
                        }
                        let owner = frame_buffer.split_prefix(consumed).map_err(|error| {
                            crate::FastCacheError::Protocol(format!("handoff split error: {error}"))
                        })?;
                        if let Err(error) = engine
                            .execute_resp_spanned_into(span_frame, owner, &mut write_buffer)
                            .await
                        {
                            RespCodec::encode(
                                &Frame::Error(format!("ERR {error}")),
                                &mut write_buffer,
                            );
                        }
                    }
                }
            }

            if !write_buffer.is_empty() {
                WriteSubmitter::submit(&write_handoff, write_buffer).await?;
            }
            if consumed_total > 0 {
                frame_buffer.advance(consumed_total).map_err(|error| {
                    crate::FastCacheError::Protocol(format!("handoff advance error: {error}"))
                })?;
            }
        }
    }
}

pub(super) struct WriteSubmitter;

impl WriteSubmitter {
    #[inline]
    pub(super) async fn submit(handoff: &WriteHandoff, buf: Vec<u8>) -> Result<()> {
        let bytes = bytes::Bytes::from(buf);
        handoff
            .write_fire_and_forget(bytes)
            .await
            .map_err(|error| {
                crate::FastCacheError::Protocol(format!("write handoff error: {error}"))
            })?;
        Ok(())
    }
}

pub(super) struct SnapshotTask;

impl SnapshotTask {
    pub(super) fn spawn(engine: EngineHandle) -> JoinHandle<()> {
        tokio::spawn(async move {
            let mut ticker = interval(engine.snapshot_interval());
            ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
            let mut last_writes = 0_u64;

            loop {
                ticker.tick().await;
                let Ok(stats) = engine.stats_snapshot().await else {
                    continue;
                };
                if stats.total_writes.saturating_sub(last_writes) < engine.snapshot_min_writes() {
                    continue;
                }
                match engine.snapshot().await {
                    Ok(path) => {
                        last_writes = stats.total_writes;
                        tracing::info!("snapshot written to {}", path.display());
                    }
                    Err(error) => {
                        tracing::warn!("snapshot attempt failed: {error}");
                    }
                }
            }
        })
    }
}