datafusion_spark/function/url/
parse_url.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{
22    Array, ArrayRef, GenericStringBuilder, LargeStringArray, StringArray, StringArrayType,
23};
24use arrow::datatypes::DataType;
25use datafusion_common::cast::{
26    as_large_string_array, as_string_array, as_string_view_array,
27};
28use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result};
29use datafusion_expr::{
30    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
31    Volatility,
32};
33use datafusion_functions::utils::make_scalar_function;
34use url::Url;
35
36#[derive(Debug, PartialEq, Eq, Hash)]
37pub struct ParseUrl {
38    signature: Signature,
39}
40
41impl Default for ParseUrl {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl ParseUrl {
48    pub fn new() -> Self {
49        Self {
50            signature: Signature::one_of(
51                vec![
52                    TypeSignature::Uniform(
53                        1,
54                        vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8],
55                    ),
56                    TypeSignature::Uniform(
57                        2,
58                        vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8],
59                    ),
60                    TypeSignature::Uniform(
61                        3,
62                        vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8],
63                    ),
64                ],
65                Volatility::Immutable,
66            ),
67        }
68    }
69    /// Parses a URL and extracts the specified component.
70    ///
71    /// This function takes a URL string and extracts different parts of it based on the
72    /// `part` parameter. For query parameters, an optional `key` can be specified to
73    /// extract a specific query parameter value.
74    ///
75    /// # Arguments
76    ///
77    /// * `value` - The URL string to parse
78    /// * `part` - The component of the URL to extract. Valid values are:
79    ///   - `"HOST"` - The hostname (e.g., "example.com")
80    ///   - `"PATH"` - The path portion (e.g., "/path/to/resource")
81    ///   - `"QUERY"` - The query string or a specific query parameter
82    ///   - `"REF"` - The fragment/anchor (the part after #)
83    ///   - `"PROTOCOL"` - The URL scheme (e.g., "https", "http")
84    ///   - `"FILE"` - The path with query string (e.g., "/path?query=value")
85    ///   - `"AUTHORITY"` - The authority component (host:port)
86    ///   - `"USERINFO"` - The user information (username:password)
87    /// * `key` - Optional parameter used only with `"QUERY"`. When provided, extracts
88    ///   the value of the specific query parameter with this key name.
89    ///
90    /// # Returns
91    ///
92    /// * `Ok(Some(String))` - The extracted URL component as a string
93    /// * `Ok(None)` - If the requested component doesn't exist or is empty
94    /// * `Err(DataFusionError)` - If the URL is malformed and cannot be parsed
95    ///
96    fn parse(value: &str, part: &str, key: Option<&str>) -> Result<Option<String>> {
97        Url::parse(value)
98            .map_err(|e| exec_datafusion_err!("{e:?}"))
99            .map(|url| match part {
100                "HOST" => url.host_str().map(String::from),
101                "PATH" => Some(url.path().to_string()),
102                "QUERY" => match key {
103                    None => url.query().map(String::from),
104                    Some(key) => url
105                        .query_pairs()
106                        .find(|(k, _)| k == key)
107                        .map(|(_, v)| v.into_owned()),
108                },
109                "REF" => url.fragment().map(String::from),
110                "PROTOCOL" => Some(url.scheme().to_string()),
111                "FILE" => {
112                    let path = url.path();
113                    match url.query() {
114                        Some(query) => Some(format!("{path}?{query}")),
115                        None => Some(path.to_string()),
116                    }
117                }
118                "AUTHORITY" => Some(url.authority().to_string()),
119                "USERINFO" => {
120                    let username = url.username();
121                    if username.is_empty() {
122                        return None;
123                    }
124                    match url.password() {
125                        Some(password) => Some(format!("{username}:{password}")),
126                        None => Some(username.to_string()),
127                    }
128                }
129                _ => None,
130            })
131    }
132}
133
134impl ScalarUDFImpl for ParseUrl {
135    fn as_any(&self) -> &dyn Any {
136        self
137    }
138
139    fn name(&self) -> &str {
140        "parse_url"
141    }
142
143    fn signature(&self) -> &Signature {
144        &self.signature
145    }
146
147    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
148        if arg_types.len() < 2 || arg_types.len() > 3 {
149            return plan_err!(
150                "{} expects 2 or 3 arguments, but got {}",
151                self.name(),
152                arg_types.len()
153            );
154        }
155        match arg_types.len() {
156            2 | 3 => {
157                if arg_types
158                    .iter()
159                    .any(|arg| matches!(arg, DataType::LargeUtf8))
160                {
161                    Ok(DataType::LargeUtf8)
162                } else if arg_types
163                    .iter()
164                    .any(|arg| matches!(arg, DataType::Utf8View))
165                {
166                    Ok(DataType::Utf8View)
167                } else {
168                    Ok(DataType::Utf8)
169                }
170            }
171            _ => plan_err!(
172                "`{}` expects 2 or 3 arguments, got {}",
173                &self.name(),
174                arg_types.len()
175            ),
176        }
177    }
178
179    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
180        let ScalarFunctionArgs { args, .. } = args;
181        make_scalar_function(spark_parse_url, vec![])(&args)
182    }
183}
184
185/// Core implementation of URL parsing function.
186///
187/// # Arguments
188///
189/// * `args` - A slice of ArrayRef containing the input arrays:
190///   - `args[0]` - URL array: The URLs to parse
191///   - `args[1]` - Part array: The URL components to extract (HOST, PATH, QUERY, etc.)
192///   - `args[2]` - Key array (optional): For QUERY part, the specific parameter names to extract
193///
194/// # Return Value
195///
196/// Returns `Result<ArrayRef>` containing:
197/// - A string array with extracted URL components
198/// - `None` values where extraction failed or component doesn't exist
199/// - The output array type (StringArray or LargeStringArray) is determined by input types
200///
201fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> {
202    if args.len() < 2 || args.len() > 3 {
203        return exec_err!(
204            "{} expects 2 or 3 arguments, but got {}",
205            "`parse_url`",
206            args.len()
207        );
208    }
209    // Required arguments
210    let url = &args[0];
211    let part = &args[1];
212
213    let result = if args.len() == 3 {
214        let key = &args[2];
215
216        match (url.data_type(), part.data_type(), key.data_type()) {
217            (DataType::Utf8, DataType::Utf8, DataType::Utf8) => {
218                process_parse_url::<_, _, _, StringArray>(
219                    as_string_array(url)?,
220                    as_string_array(part)?,
221                    as_string_array(key)?,
222                )
223            }
224            (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
225                process_parse_url::<_, _, _, StringArray>(
226                    as_string_view_array(url)?,
227                    as_string_view_array(part)?,
228                    as_string_view_array(key)?,
229                )
230            }
231            (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => {
232                process_parse_url::<_, _, _, LargeStringArray>(
233                    as_large_string_array(url)?,
234                    as_large_string_array(part)?,
235                    as_large_string_array(key)?,
236                )
237            }
238            _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args),
239        }
240    } else {
241        // The 'key' argument is omitted, assume all values are null
242        // Create 'null' string array for 'key' argument
243        let mut builder: GenericStringBuilder<i32> = GenericStringBuilder::new();
244        for _ in 0..args[0].len() {
245            builder.append_null();
246        }
247        let key = builder.finish();
248
249        match (url.data_type(), part.data_type()) {
250            (DataType::Utf8, DataType::Utf8) => {
251                process_parse_url::<_, _, _, StringArray>(
252                    as_string_array(url)?,
253                    as_string_array(part)?,
254                    &key,
255                )
256            }
257            (DataType::Utf8View, DataType::Utf8View) => {
258                process_parse_url::<_, _, _, StringArray>(
259                    as_string_view_array(url)?,
260                    as_string_view_array(part)?,
261                    &key,
262                )
263            }
264            (DataType::LargeUtf8, DataType::LargeUtf8) => {
265                process_parse_url::<_, _, _, LargeStringArray>(
266                    as_large_string_array(url)?,
267                    as_large_string_array(part)?,
268                    &key,
269                )
270            }
271            _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args),
272        }
273    };
274    result
275}
276
277fn process_parse_url<'a, A, B, C, T>(
278    url_array: &'a A,
279    part_array: &'a B,
280    key_array: &'a C,
281) -> Result<ArrayRef>
282where
283    &'a A: StringArrayType<'a>,
284    &'a B: StringArrayType<'a>,
285    &'a C: StringArrayType<'a>,
286    T: Array + FromIterator<Option<String>> + 'static,
287{
288    url_array
289        .iter()
290        .zip(part_array.iter())
291        .zip(key_array.iter())
292        .map(|((url, part), key)| {
293            if let (Some(url), Some(part), key) = (url, part, key) {
294                ParseUrl::parse(url, part, key)
295            } else {
296                Ok(None)
297            }
298        })
299        .collect::<Result<T>>()
300        .map(|array| Arc::new(array) as ArrayRef)
301}