Crate form_data

source ·
Expand description

form-data implemented rfc7578

Example

#![deny(warnings)]

use std::{env, net::SocketAddr};

use anyhow::Result;
use async_fs::File;
use bytes::Bytes;
use futures_util::{
    io::{copy, AsyncWriteExt},
    stream::TryStreamExt,
};
use http_body_util::Full;
use hyper::{body::Incoming, header, server::conn::http1, service::service_fn, Request, Response};
use tempfile::tempdir;
use tokio::net::TcpListener;

use form_data::{Error, FormData};

#[path = "../tests/lib/mod.rs"]
mod lib;

use lib::IncomingBody;

async fn hello(size: usize, req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Error> {
    let dir = tempdir()?;
    let mut txt = String::new();

    txt.push_str(&dir.path().to_string_lossy());
    txt.push_str("\r\n");

    let m = req
        .headers()
        .get(header::CONTENT_TYPE)
        .and_then(|val| val.to_str().ok())
        .and_then(|val| val.parse::<mime::Mime>().ok())
        .ok_or(Error::InvalidHeader)?;

    let mut form = FormData::new(
        req.map(|body| IncomingBody::new(Some(body))).into_body(),
        m.get_param(mime::BOUNDARY).unwrap().as_str(),
    );

    // 512KB for hyper lager buffer
    form.set_max_buf_size(size)?;

    while let Some(mut field) = form.try_next().await? {
        let name = field.name.to_owned();
        let mut bytes: u64 = 0;

        assert_eq!(bytes as usize, field.length);

        if let Some(filename) = &field.filename {
            let filepath = dir.path().join(filename);

            match filepath.extension().and_then(|s| s.to_str()) {
                Some("txt") => {
                    // buffer <= 8KB
                    let mut writer = File::create(&filepath).await?;
                    bytes = copy(&mut field, &mut writer).await?;
                    writer.close().await?;
                }
                Some("iso") => {
                    field.ignore().await?;
                }
                _ => {
                    // 8KB <= buffer <= 512KB
                    // let mut writer = File::create(&filepath).await?;
                    // bytes = field.copy_to(&mut writer).await?;

                    let mut writer = std::fs::File::create(&filepath)?;
                    bytes = field.copy_to_file(&mut writer).await?;
                }
            }

            tracing::info!("file {} {}", name, bytes);
            txt.push_str(&format!("file {name} {bytes}\r\n"));
        } else {
            let buffer = field.bytes().await?;
            bytes = buffer.len() as u64;
            tracing::info!("text {} {}", name, bytes);
            txt.push_str(&format!("text {name} {bytes}\r\n"));
        }

        tracing::info!("{:?}", field);

        assert_eq!(
            bytes,
            match name.as_str() {
                "empty" => 0,
                "tiny1" => 7,
                "tiny0" => 122,
                "small1" => 315,
                "small0" => 1_778,
                "medium" => 13_196,
                "large" => 2_413_677,
                "book" => 400_797_393,
                "crate" => 9,
                _ => bytes,
            }
        );
    }

    dir.close()?;

    Ok(Response::new(Full::from(Into::<String>::into(txt))))
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    tracing_subscriber::fmt()
        // From env var: `RUST_LOG`
        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
        .try_init()
        .map_err(|e| anyhow::anyhow!(e))?;

    let mut arg = env::args()
        .find(|a| a.starts_with("--size="))
        .unwrap_or_else(|| "--size=8".to_string());

    // 512
    // 8 * 2
    // 8
    let size = arg.split_off(7).parse::<usize>().unwrap_or(8) * 1024;
    let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();

    println!("Listening on http://{addr}");
    println!("FormData max buffer size is {}KB", size / 1024);

    let listener = TcpListener::bind(addr).await?;

    loop {
        let (stream, _) = listener.accept().await?;

        tokio::task::spawn(async move {
            if let Err(err) = http1::Builder::new()
                .max_buf_size(size)
                .serve_connection(
                    stream,
                    service_fn(|req: Request<Incoming>| hello(size, req)),
                )
                .await
            {
                println!("Error serving connection: {:?}", err);
            }
        });
    }
}

Structs

Field
FormData
Various limits on incoming data
IO State

Enums

Form-data Error